Then check out ScrapeOps, the complete toolkit for web scraping.
'https://www.walmart.com/search?q=ipad&sort=best_seller&page=1&affinityOverride=default'
q
stands for the search query. In our case, q=ipad
. Note: If you want to search for a keyword that contains spaces or special characters then remember you need to encode this value.sort
stands for the sorting order of the query. In our case, we used sort=best_seller
, however other options are best_match
, price_low
and price_high
.page
stands for the page number. In our cases, we've requested page=1
.sort=price_low
and sort=price_high
) and then filter it for the unique values.<script id="__NEXT_DATA__" type="application/json">
tag and parse it into JSON.<script id="__NEXT_DATA__" type="application/json" nonce="">"{ ...DATA... }"</script>
product_list = json_blob["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"]
import jsonimport requestsfrom bs4 import BeautifulSoupfrom urllib.parse import urlencode def create_walmart_product_url(product): return 'https://www.walmart.com' + product.get('canonicalUrl', '').split('?')[0] headers={"User-Agent": "Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148"}product_url_list = [] ## Walmart Search Keywordkeyword = 'ipad' ## Loop Through Walmart Pages Until No More Productsfor page in range(1, 5): try: payload = {'q': keyword, 'sort': 'best_seller', 'page': page, 'affinityOverride': 'default'} walmart_search_url = 'https://www.walmart.com/search?' + urlencode(payload) response = requests.get(walmart_search_url, headers=headers) if response.status_code == 200: html_response = response.text soup = BeautifulSoup(html_response, "html.parser") script_tag = soup.find("script", {"id": "__NEXT_DATA__"}) if script_tag is not None: json_blob = json.loads(script_tag.get_text()) product_list = json_blob["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] product_urls = [create_walmart_product_url(product) for product in product_list] product_url_list.extend(product_urls) if len(product_urls) == 0: break except Exception as e: print('Error', e) print(product_url_list)
[ "https://www.walmart.com/ip/2021-Apple-10-2-inch-iPad-Wi-Fi-64GB-Space-Gray-9th-Generation/483978365", "https://www.walmart.com/ip/2021-Apple-iPad-Mini-Wi-Fi-64GB-Purple-6th-Generation/996045822", "https://www.walmart.com/ip/2022-Apple-10-9-inch-iPad-Air-Wi-Fi-64GB-Purple-5th-Generation/860872590", "https://www.walmart.com/ip/2021-Apple-11-inch-iPad-Pro-Wi-Fi-128GB-Space-Gray-3rd-Generation/354993710", "https://www.walmart.com/ip/2021-Apple-12-9-inch-iPad-Pro-Wi-Fi-128GB-Space-Gray-5th-Generation/774697337", "https://www.walmart.com/ip/2020-Apple-10-9-inch-iPad-Air-Wi-Fi-64GB-Sky-Blue-4th-Generation/462727496", "https://www.walmart.com/ip/2021-Apple-iPad-Mini-Wi-Fi-Cellular-64GB-Starlight-6th-Generation/406091219", "https://www.walmart.com/ip/2020-Apple-10-9-inch-iPad-Air-Wi-Fi-Cellular-64GB-Silver-4th-Generation/470306039", "https://www.walmart.com/ip/2022-Apple-10-9-inch-iPad-Air-Wi-Fi-Cellular-64GB-Blue-5th-Generation/234669711", "https://www.walmart.com/ip/2021-Apple-10-2-inch-iPad-Wi-Fi-Cellular-64GB-Space-Gray-9th-Generation/414515010", "https://www.walmart.com/ip/2021-Apple-11-inch-iPad-Pro-Wi-Fi-Cellular-128GB-Space-Gray-3rd-Generation/851470965", "https://www.walmart.com/ip/2021-Apple-12-9-inch-iPad-Pro-Wi-Fi-Cellular-256GB-Space-Gray-5th-Generation/169993514" ]
def extract_product_data(product): return { 'url': create_walmart_url(product), 'name': product.get('name', ''), 'description': product.get('description', ''), 'image_url': product.get('image', ''), 'average_rating': product['rating'].get('averageRating'), 'number_reviews': product['rating'].get('numberOfReviews'), } product_data_list = [extract_product_data(product) for product in product_list]
<script id="__NEXT_DATA__" type="application/json">
tag in the HTML response it is pretty easy to extract the data.<script id="__NEXT_DATA__" type="application/json" nonce="">"{ ...DATA... }"</script>
product_data = json_blob["props"]["pageProps"]["initialData"]["data"]["product"]
product_reviews = json_blob["props"]["pageProps"]["initialData"]["data"]["reviews"]
import jsonimport requestsfrom bs4 import BeautifulSoupfrom urllib.parse import urlencode headers={"User-Agent": "Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148"} product_data_list = [] ## Loop Through Walmart Product URL Listfor url in product_url_list: try: response = requests.get(url, headers=headers) if response.status_code == 200: html_response = response.text soup = BeautifulSoup(html_response, "html.parser") script_tag = soup.find("script", {"id": "__NEXT_DATA__"}) if script_tag is not None: json_blob = json.loads(script_tag.get_text()) raw_product_data = json_blob["props"]["pageProps"]["initialData"]["data"]["product"] product_data_list.append({ 'id': raw_product_data.get('id'), 'type': raw_product_data.get('type'), 'name': raw_product_data.get('name'), 'brand': raw_product_data.get('brand'), 'averageRating': raw_product_data.get('averageRating'), 'manufacturerName': raw_product_data.get('manufacturerName'), 'shortDescription': raw_product_data.get('shortDescription'), 'thumbnailUrl': raw_product_data['imageInfo'].get('thumbnailUrl'), 'price': raw_product_data['priceInfo']['currentPrice'].get('price'), 'currencyUnit': raw_product_data['priceInfo']['currentPrice'].get('currencyUnit'), }) except Exception as e: print('Error', e) print(product_data_list)
[ { "id": "4SR8VU90LQ0P", "type": "Tablet Computers", "name": "2021 Apple 10.2-inch iPad Wi-Fi 64GB - Space Gray (9th Generation)", "brand": "Apple", "averageRating": 4.7, "manufacturerName": "Apple", "shortDescription": "Powerful. Easy to use. Versatile. The new iPad has a beautiful 10.2-inch Retina display, powerful A13 Bionic chip, an Ultra Wide front camera with Center Stage, and works with Apple Pencil and the Smart Keyboard. iPad lets you do more, more easily. All for an incredible value.<p></p>", "thumbnailUrl": "https://i5.walmartimages.com/asr/86cda84e-4f55-4ffa-954e-9ca5ae27b723.8a72a9690e1951f535eed412cc9e5fc3.jpeg", "price": 299, "currencyUnit": "USD" },]
SCRAPEOPS_API_KEY = 'YOUR_API_KEY' def scrapeops_url(url): payload = {'api_key': SCRAPEOPS_API_KEY, 'url': url, 'country': 'us'} proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload) return proxy_url walmart_url = 'https://www.walmart.com/search?q=ipad&sort=best_seller&page=1&affinityOverride=default' ## Send URL To ScrapeOps Instead of Walmart response = requests.get(scrapeops_url(walmart_url))
import jsonimport requestsfrom bs4 import BeautifulSoupfrom urllib.parse import urlencode SCRAPEOPS_API_KEY = 'YOUR_API_KEY' def scrapeops_url(url): payload = {'api_key': SCRAPEOPS_API_KEY, 'url': url, 'country': 'us'} proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload) return proxy_url product_data_list = [] ## Loop Through Walmart Product URL Listfor url in product_url_list: try: response = requests.get(scrapeops_url(url)) if response.status_code == 200: html_response = response.text soup = BeautifulSoup(html_response, "html.parser") script_tag = soup.find("script", {"id": "__NEXT_DATA__"}) if script_tag is not None: json_blob = json.loads(script_tag.get_text()) raw_product_data = json_blob["props"]["pageProps"]["initialData"]["data"]["product"] product_data_list.append({ 'id': raw_product_data.get('id'), 'type': raw_product_data.get('type'), 'name': raw_product_data.get('name'), 'brand': raw_product_data.get('brand'), 'averageRating': raw_product_data.get('averageRating'), 'manufacturerName': raw_product_data.get('manufacturerName'), 'shortDescription': raw_product_data.get('shortDescription'), 'thumbnailUrl': raw_product_data['imageInfo'].get('thumbnailUrl'), 'price': raw_product_data['priceInfo']['currentPrice'].get('price'), 'currencyUnit': raw_product_data['priceInfo']['currentPrice'].get('currencyUnit'), }) except Exception as e: print('Error', e) print(product_data_list)
Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your ScrapeOps API key.python name_of_your_file.py
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoup import concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" author_id: str = "" rating: int = 0 date: str = "" review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed to get page {page_number}, status code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"] for review in review_list: name = review["userNickname"] author_id = review["authorId"] rating = review["rating"] date = review["reviewSubmissionTime"] review = review["reviewText"] review_data = ReviewData( name=name, author_id=author_id, rating=rating, date=date, review=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: Defines the maximum number of times the script will retry fetching a webpage if a request fails due to issues such as network timeouts or non-200 HTTP responses.MAX_THREADS
: Sets the maximum number of threads that will be used concurrently while scraping.PAGES
: The number of search result pages to scrape for each keyword.LOCATION
: The location or country code where the products or reviews will be scraped from.keyword_list
: A list of product keywords to search for on Walmart’s website (e.g., ["laptop"]).https://www.walmart.com/search?q={formatted_keyword}
https://www.walmart.com/reviews/product/{product_id}
page
. If we want to view page 1, our URL would contain page=1
.Our fully paginated URL would behttps://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}
page_number+1
because Python's range()
function begins counting at 0.country
param when talking to the ScrapeOps Proxy API. When we pass this parameter, ScrapeOps will route us through the country of our choosing."country": "us"
."country": "uk"
.mkdir walmart-scraper cd walmart-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
scrape_search_results()
.Here's the code we'll start with.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoup import concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed to get page {page_number}, status code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = { "name": name, "stars": rating, "url": link, "sponsored": sponsored, "price": price, "product_id": product_id } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
finds the JSON.json.loads()
to convert the text into a JSON object.json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"]
is used to access our item list."name"
: the name of the product."stars"
: the overall rating for the product."url"
: the link to the product's reviews."sponsored"
: whether or not it is a sponsored item, basically an ad."price"
: the price of the item."product_id"
: the unique number assigned to each product on the site.page
parameter to our URL.Our URLs will now look like this. We use page_number+1
because our Walmart pages start at 1, but Python's range()
function begins counting at 0."https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}"
start_scrape()
function. This one takes in a list of pages and runs scrape_search_results()
on each page from the list.def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoup import concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed to get page {page_number}, status code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = { "name": name, "stars": rating, "url": link, "sponsored": sponsored, "price": price, "product_id": product_id } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
page
argument to our both our parsing function and our url.start_scrape()
allows us to scrape a list of pages.dataclass
. We're going to call this one, SearchData
, because we'll use it to represent objects from our search results.@dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. This class opens up a pipe to a CSV file. On top of that, it does a couple other important things. If the file doesn't exist, our class creates it. If the CSV does exist, it appends it instead.Our DataPipeline
also uses the name
attribute to filter out duplicate data.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
DataPipeline
and pass it into our crawling functions. Then, instead of printing our data, we turn it into SearchData
and pass it into the DataPipeline
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoup import concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed to get page {page_number}, status code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData
represents a search results object.DataPipeline
is used to pipe dataclass
(in this case, SearchData
) objects to a CSV.ThreadPoolExecutor
. This gives us the power of multithreading. We'll open up a new threadpool and then parse an individual page on each available thread.Take a look below, we've rewritten start_scrape()
and removed the for
loop.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoup import concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed to get page {page_number}, status code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
executor.map()
:scrape_search_results
: our parsing function.executor
passes the args from these arrays into each call of scrape_search_results
.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
payload
:"api_key"
: our ScrapeOps API key."url"
: the URL we want to scrape."country"
: is the location we wish to appear in.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoup import concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed to get page {page_number}, status code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
we'll be working with. we're crawling 4 pages on 3 threads.Feel free to change any of these to tweak your results:MAX_RETRIES
: Defines the maximum number of times the script will retry fetching a webpage if a request fails due to issues such as network timeouts or non-200 HTTP responses.MAX_THREADS
: Sets the maximum number of threads that will be used concurrently while scraping.PAGES
: The number of search result pages to scrape for each keyword.LOCATION
: The location or country code where the products or reviews will be scraped from.keyword_list
: A list of product keywords to search for on Walmart’s website (e.g., ["laptop"]).if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"] for review in review_list: name = review["userNickname"] author_id = review["authorId"] rating = review["rating"] date = review["reviewSubmissionTime"] review = review["reviewText"] review_data = { "name": name, "author_id": author_id, "rating": rating, "date": date, "review": review } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
.json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"]
finds our list of customer reviews.name
: the name of the reviewer.author_id
: a unique identifier for the reviewer, much like our product_id
from earlier.rating
: the rating left by the reviewer.date
: the date that the review was left.review
: the actual text of the review, for instance "It was good. I really like [x] about this laptop."start_scrape()
from earlier.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries)
process_item()
on each row from the file using a for
loop. We'll remove the for
loop a little later on when we add concurrency later on. You can see our fully updated code below.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoup import concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed to get page {page_number}, status code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"] for review in review_list: name = review["userNickname"] author_id = review["authorId"] rating = review["rating"] date = review["reviewSubmissionTime"] review = review["reviewText"] review_data = { "name": name, "author_id": author_id, "rating": rating, "date": date, "review": review } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
process_results()
reads our CSV into an array and iterates through it.process_item()
on each item from the array to scrape its reviews.DataPipeline
that can take in a dataclass
, but the only class we have is SearchData
.To address this, we're going to add another one called ReviewData
. Then, from within our parsing function, we'll create a new DataPipeline
and pass ReviewData
objects into it while we parse.Take a look at ReviewData
.@dataclassclass ReviewData: name: str = "" author_id: str = "" rating: int = 0 date: str = "" review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
ReviewData
holds all of the data we extracted during the parse. You can see how everything works in the full code below.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoup import concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" author_id: str = "" rating: int = 0 date: str = "" review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed to get page {page_number}, status code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"] for review in review_list: name = review["userNickname"] author_id = review["authorId"] rating = review["rating"] date = review["reviewSubmissionTime"] review = review["reviewText"] review_data = ReviewData( name=name, author_id=author_id, rating=rating, date=date, review=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline
.ReviewData
objects into the pipeline as we parse them.ThreadPoolExecutor
will open up a new pool of threads. We'll pass our parsing function in as the first argument, and then everything else gets passed in as an array to then get passed into our parser.Here is the finished process_results()
.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) )
get_scrapeops_url()
from within our parser and now we have a custom proxy for each request we make during our review scrape.scrapeops_proxy_url = get_scrapeops_url(url, location=location)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoup import concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" author_id: str = "" rating: int = 0 date: str = "" review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed to get page {page_number}, status code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv") script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.text) review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"] for review in review_list: name = review["userNickname"] author_id = review["authorId"] rating = review["rating"] date = review["reviewSubmissionTime"] review = review["reviewText"] review_data = ReviewData( name=name, author_id=author_id, rating=rating, date=date, review=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
. Violating their policies can lead to suspension and even deletion of your account.You can view their terms here.Walmart's robots.txt
is available here.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file containing your ScrapeOps API key.name_of_your_file.py
.import os import csv import json import logging import time import concurrent.futures from dataclasses import dataclass, field, fields, asdict from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class ReviewData: name: str = "" author_id: int = 0 rating: float = 0.0 date: str = "" review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) # Set up Selenium WebDriver options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) driver.get(scrapeops_proxy_url) logger.info(f"Received page: {url}") # Wait for the page to load and get the script tag time.sleep(3) # Adjust the sleep time as needed script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute("innerHTML")) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True driver.quit() # Close the WebDriver except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 driver.quit() # Close the WebDriver on error if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row.get("url") tries = 0 success = False options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) logger.info(f"Attempting to access URL: {scrapeops_proxy_url}") driver.get(scrapeops_proxy_url) logger.info(f"Status: {driver.title}") # Wait for the page to load and get the script tag time.sleep(3) # Adjust the sleep time as needed script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute("innerHTML")) review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in review_list: name = review["userNickname"] author_id = review["authorId"] rating = review["rating"] date = review["reviewSubmissionTime"] review_text = review["reviewText"] review_data = ReviewData( name=name, author_id=author_id, rating=rating, date=date, review=review_text ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}") logger.warning(f"Retries left: {retries-tries}") tries += 1 driver.quit() # Close the WebDriver if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="", encoding="utf-8") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, # Pass each row in the CSV as the first argument to process_item [location] * len(reader), # Location as second argument for all rows [retries] * len(reader) # Retries as third argument for all rows ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION,max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: Sets the maximum number of retries if a request fails. This can happen due to network timeouts or non-200 HTTP responses.MAX_THREADS
Defines how many threads will run at the same time during scraping.PAGES
The number of search result pages to scrape for each keyword.LOCATION
The location or country code for scraping products or reviews.keyword_list
A list of product keywords to search on Walmart’s website (e.g., ["laptop"])https://www.walmart.com/search?q={formatted_keyword}
https://www.walmart.com/reviews/product/{product_id}
page
. To view page 1, the URL will have page=1
.The full paginated URL will be:https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}
page_number+1
because Python’s range()
function starts counting at 0.country
parameter with the ScrapeOps Proxy API. This parameter allows us to choose the country."country": "us"
."country": "uk"
.mkdir walmart-scraper
cd walmart-scraper
python -m venv venv
source venv/bin/activate
pip install seleniumpip install webdriver-manager
scrape_search_results()
.Here’s the code we’ll start with:import os import json import logging import time from selenium import webdriver from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options import concurrent.futures from dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}" tries = 0 success = False # Set up Selenium WebDriver options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) while tries < retries and not success: try: driver.get(url) logger.info(f"Received page from: {url}") # Wait for the necessary elements to load time.sleep(3) # Adjust sleep time as necessary # Retrieve the JSON data from the page script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute('innerText')) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = { "name": name, "stars": rating, "url": link, "sponsored": sponsored, "price": price, "product_id": product_id } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}") tries += 1 driver.quit() # Ensure to close the browser if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
to locate the JSON.json.loads()
to turn the text into a JSON object.json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"]
.https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}
page_number+1
because Walmart pages start at 1, while Python's range()
starts from 0.Next, we create a start_scrape()
function. This function takes a list of pages and runs scrape_search_results()
on each one.def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import os import json import logging import time from selenium import webdriver from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options import concurrent.futures from dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number + 1}" tries = 0 success = False # Set up Selenium WebDriver options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) while tries < retries and not success: try: driver.get(url) logger.info(f"Received page from: {url}") # Wait for the necessary elements to load time.sleep(3) # Adjust sleep time as necessary # Retrieve the JSON data from the page script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute('innerText')) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = { "name": name, "stars": rating, "url": link, "sponsored": sponsored, "price": price, "product_id": product_id } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}") tries += 1 driver.quit() # Ensure to close the browser if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 2 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
dataclass
. We'll name it SearchData
, as it will represent objects from our search results.@dataclass class SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. This class establishes a connection to a CSV file. In addition, it performs a few other crucial tasks.If the file isn’t already present, the class creates it. If the CSV file exists, it appends to it instead.The DataPipeline also utilizes the name attribute to eliminate duplicate entries.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import os import csv import json import logging import time from selenium import webdriver from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number + 1}" tries = 0 success = False # Set up Selenium WebDriver options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) while tries < retries and not success: try: driver.get(url) logger.info(f"Received page from: {url}") # Wait for the necessary elements to load time.sleep(3) # Adjust sleep time as necessary # Retrieve the JSON data from the page script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute('innerText')) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}") tries += 1 driver.quit() # Ensure to close the browser if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
ThreadPoolExecutor
to enable multithreading. By opening a new threadpool, we can parse a single page on each available thread.Below is a rewritten version of start_scrape()
, where the for loop has been eliminated.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
import os import csv import json import logging import time import concurrent.futures from dataclasses import dataclass, field, fields, asdict from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number + 1}" tries = 0 success = False # Set up Selenium WebDriver options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) while tries <= retries and not success: try: driver.get(url) time.sleep(3) # Allow time for the page to load # Find and parse the script tag with the JSON data script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute("innerHTML")) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") driver.quit() # Close the WebDriver def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["laptop"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
executor.map()
, pay attention to our arguments: scrape_search_results
, which is the function for parsing. All other arguments are passed as arrays, and then the executor passes these array elements as arguments into each call to scrape_search_results
.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import os import csv import json import logging import time from urllib.parse import urlencode import concurrent.futures from dataclasses import dataclass, field, fields, asdict from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number + 1}" tries = 0 success = False # Set up Selenium WebDriver options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) # Use Selenium to load the page time.sleep(3) # Allow time for the page to load # Find and parse the script tag with the JSON data script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute("innerHTML")) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") driver.quit() # Close the WebDriver def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["laptop"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
MAX_RETRIES
: Specifies the maximum attempts the script will make to retrieve a webpage if a request fails due to network issues or non-200 HTTP status codes.MAX_THREADS
: Determines the highest number of threads that will be run simultaneously during the scraping process.PAGES
: The total number of result pages to scrape for every keyword.LOCATION
: Indicates the region or country code from which products or reviews will be scraped.keyword_list
: Contains a list of product keywords to search on Walmart's website (for example, ["laptop"]).if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["laptop"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def process_item(row,location, retries=3): url = row["url"] tries = 0 success = False # Set up Selenium WebDriver options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) while tries <= retries and not success: try: driver.get(url) time.sleep(3) # Allow time for the page to load # Find and parse the script tag with the JSON data script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute("innerHTML")) review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in review_list: name = review["userNickname"] author_id = review["authorId"] rating = review["rating"] date = review["reviewSubmissionTime"] review_text = review["reviewText"] review_data = ReviewData( name=name, author_id=author_id, rating=rating, date=date, review=review_text ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True logger.info(f"Successfully parsed: {url}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries - tries}") tries += 1 driver.quit() # Close the WebDriver if not success: raise Exception(f"Max Retries exceeded: {retries}")
driver.find_element(By.CSS_SELECTOR, "script[id='**NEXT_DATA**'][type='application/json']")
.json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"]
.name
: the reviewer's name.author_id
: a unique ID for the reviewer, similar to the product_id mentioned previously.rating
: the score given by the reviewer.date
: the date on which the review was posted.review
: the review text, such as "It was good. I really liked [x] about this laptop."start_scrape()
from before.def process_results(csv_file, location, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries)
process_item()
on every row from the file. We will remove the for loop later when we incorporate concurrency.The fully updated code is shown below.import os import csv import json import logging import time import concurrent.futures from dataclasses import dataclass, field, fields, asdict from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) # Set up Selenium WebDriver options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) driver.get(scrapeops_proxy_url) logger.info(f"Received page: {url}") # Wait for the page to load and get the script tag time.sleep(3) # Adjust the sleep time as needed script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute("innerHTML")) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True driver.quit() # Close the WebDriver except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 driver.quit() # Close the WebDriver on error if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row.get("url") tries = 0 success = False options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Status: {driver.title}") # Wait for the page to load and get the script tag time.sleep(3) # Adjust the sleep time as needed script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute("innerHTML")) review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in review_list: name = review["userNickname"] author_id = review["authorId"] rating = review["rating"] date = review["reviewSubmissionTime"] review = review["reviewText"] review_data = { "name": name, "author_id": author_id, "rating": rating, "date": date, "review": review } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}") logger.warning(f"Retries left: {retries-tries}") tries += 1 driver.quit() # Close the WebDriver if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
process_results()
reads our CSV file into an array and iterates over it. During each iteration, we call process_item()
on the individual items from the array to scrape their reviews.SearchData
.To solve this, we will introduce another one called ReviewData
. Then, inside our parsing function, we will create a new DataPipeline and send ReviewData
objects to it as we parse.Here’s a look at ReviewData
.@dataclass class ReviewData: name: str = "" author_id: int = 0 rating: float = 0.0 date: str = "" review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
ReviewData
. The full code below shows how everything functions.import os import csv import json import logging import time import concurrent.futures from dataclasses import dataclass, field, fields, asdict from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class ReviewData: name: str = "" author_id: int = 0 rating: float = 0.0 date: str = "" review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) # Set up Selenium WebDriver options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) driver.get(scrapeops_proxy_url) logger.info(f"Received page: {url}") # Wait for the page to load and get the script tag time.sleep(3) # Adjust the sleep time as needed script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute("innerHTML")) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True driver.quit() # Close the WebDriver except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 driver.quit() # Close the WebDriver on error if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row.get("url") tries = 0 success = False options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Status: {driver.title}") # Wait for the page to load and get the script tag time.sleep(3) # Adjust the sleep time as needed script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute("innerHTML")) review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in review_list: name = review["userNickname"] author_id = review["authorId"] rating = review["rating"] date = review["reviewSubmissionTime"] review = review["reviewText"] review_data = ReviewData( name=name, author_id=author_id, rating=rating, date=date, review=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}") logger.warning(f"Retries left: {retries-tries}") tries += 1 driver.quit() # Close the WebDriver if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="", encoding="utf-8") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
ThreadPoolExecutor
. Our parsing function will be passed as the first argument, and everything else will be passed in as an array to be sent into our parser.Here is the final process_results()
.def process_results(csv_file, location, max_threads, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="", encoding="utf-8") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, # Pass each row in the CSV as the first argument to process_item [location] * len(reader), # Location as second argument for all rows [retries] * len(reader) # Retries as third argument for all rows )
get_scrapeops_url()
, and now, for each request made during our review scrape, we have a custom proxy.scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url)
import os import csv import json import logging import time import concurrent.futures from dataclasses import dataclass, field, fields, asdict from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" stars: float = 0 url: str = "" sponsored: bool = False price: float = 0.0 product_id: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class ReviewData: name: str = "" author_id: int = 0 rating: float = 0.0 date: str = "" review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) # Set up Selenium WebDriver options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) driver.get(scrapeops_proxy_url) logger.info(f"Received page: {url}") # Wait for the page to load and get the script tag time.sleep(3) # Adjust the sleep time as needed script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute("innerHTML")) item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] for item in item_list: if item["__typename"] != "Product": continue name = item.get("name") product_id = item["usItemId"] if not name: continue link = f"https://www.walmart.com/reviews/product/{product_id}" price = item["price"] sponsored = item["isSponsoredFlag"] rating = item["averageRating"] search_data = SearchData( name=name, stars=rating, url=link, sponsored=sponsored, price=price, product_id=product_id ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True driver.quit() # Close the WebDriver except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 driver.quit() # Close the WebDriver on error if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row.get("url") tries = 0 success = False options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) logger.info(f"Attempting to access URL: {scrapeops_proxy_url}") driver.get(scrapeops_proxy_url) logger.info(f"Status: {driver.title}") # Wait for the page to load and get the script tag time.sleep(3) # Adjust the sleep time as needed script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'][type='application/json']") json_data = json.loads(script_tag.get_attribute("innerHTML")) review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in review_list: name = review["userNickname"] author_id = review["authorId"] rating = review["rating"] date = review["reviewSubmissionTime"] review_text = review["reviewText"] review_data = ReviewData( name=name, author_id=author_id, rating=rating, date=date, review=review_text ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}") logger.warning(f"Retries left: {retries-tries}") tries += 1 driver.quit() # Close the WebDriver if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="", encoding="utf-8") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, # Pass each row in the CSV as the first argument to process_item [location] * len(reader), # Location as second argument for all rows [retries] * len(reader) # Retries as third argument for all rows ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION,max_threads=MAX_THREADS, retries=MAX_RETRIES)
if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["laptop"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION,max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
. Violating their policies can lead to suspension and even deletion of your account.You can view their terms here.Walmart's robots.txt
is available here.