Then check out ScrapeOps, the complete toolkit for web scraping.
config.json file.{"api_key": "your-super-secret-api-key"}.python name_of_your_script.py!import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" model_number: str = "" sku: str = "" rating: float = 0.0 spoonsored: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" rating: float = 0.0 incentivized: bool = False verified: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="shop-sku-list-item") for div_card in div_cards: sponsored = False sponsored_tag = div_card.find("div", class_="is-sponsored") if sponsored_tag: sponsored = True name = div_card.find("h4", class_="sku-title").text price_holder = div_card.select_one("div[data-testid='customer-price']") price = price_holder.select_one("span[aria-hidden='true']").text model_holder = div_card.find("div", class_="sku-model") model_info_array = model_holder.find_all("span", class_="sku-value") model_number = model_info_array[0].text sku_number = model_info_array[1].text rating_holder = div_card.find("div", class_="ratings-reviews") href = rating_holder.find("a") link = "n/a" if href: link = f"https://www.bestbuy.com{href.get('href')}" rating_text = rating_holder.find("p", class_="visually-hidden").text rating = 0.0 if rating_text != "Not Yet Reviewed": rating = rating_text.split(" ")[1] search_data = SearchData( name=name, url=link, price=price, model_number=model_number, sku=sku_number, rating=rating, spoonsored=sponsored ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] if url == "n/a": return tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.find_all("li", class_="review-item-simple") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review_card in review_cards: rating_holder = review_card.find("div", class_="review-rating") rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1]) name = review_card.find("h4").text incentivized = False incentivized_button = review_card.select_one("button[title='badge for Incentivized']") if incentivized_button: incentivized = True verified = False verified_button = review_card.select_one("button[title='badge for Verified Purchaser']") if verified_button: incentivized = True review_data = ReviewData( name=name, rating=rating, incentivized=incentivized, verified=verified ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_THREADS: Controls the number of threads that the program will use for concurrent execution.MAX_RETRIES: Defines the number of times the scraper will retry a failed request before giving up.PAGES: Determines how many pages of Google search results to scrape for each keyword.LOCATION: Specifies the geographical location (country) for the Google search.keyword_list: This is a list of keywords for which the script will perform the search and subsequent scraping.https://www.bestbuy.com/site/searchpage.jsp?cp=1&st=gpu
https://www.bestbuy.com/site/searchpage.jsp
? and each query param is separated by &. The one we need to pay attention to here is st=gpu. st represents our search term, in this case, gpu.
Now, let's take a look at BestBuy's individual product pages. Here is an example URL:
https://www.bestbuy.com/site/asus-tuf-gaming-nvidia-geforce-rtx-4080-super-overclock-16gb-gddr6x-pci-express-4-0-graphics-card-black/6574587.p?skuId=6574587.
As you can see in our URL, it gets laid out like this:
https://www.bestbuy.com/site/{PRODUCT_NAME}/{SKU_NUMBER}.p?skuId={SKU_NUMBER}
div with a class of shop-sku-list-item. When we find this item, we can pick through it and find all of our relevant data.
On the product page, we follow a similar structure with reviews. Each review is held in an li element with a class of review-item-simple.
https://www.bestbuy.com/site/searchpage.jsp?cp=1&st=gpu
cp=1. cp represents our page number.
country parameter, and ScrapeOps will route us through the country of our choice.
"country": "us"."country": "uk".mkdir bestbuy-scraper cd bestbuy-scraper
python -m venv venvsource venv/bin/activatepip install requestspip install beautifulsoup4scrape_search_results().
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.bestbuy.com/site/searchpage.jsp?st={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="shop-sku-list-item") for div_card in div_cards: sponsored = False sponsored_tag = div_card.find("div", class_="is-sponsored") if sponsored_tag: sponsored = True name = div_card.find("h4", class_="sku-title").text price_holder = div_card.select_one("div[data-testid='customer-price']") price = price_holder.select_one("span[aria-hidden='true']").text model_holder = div_card.find("div", class_="sku-model") model_info_array = model_holder.find_all("span", class_="sku-value") model_number = model_info_array[0].text sku_number = model_info_array[1].text rating_holder = div_card.find("div", class_="ratings-reviews") href = rating_holder.find("a") link = "n/a" if href: link = f"https://www.bestbuy.com{href.get('href')}" rating_text = rating_holder.find("p", class_="visually-hidden").text rating = 0.0 if rating_text != "Not Yet Reviewed": rating = rating_text.split(" ")[1] search_data = { "name": name, "url": link, "price": price, "model_number": model_number, "sku": sku_number, "rating": rating, "spoonsored": sponsored } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
scrape_search_results():
soup.find_all("div", class_="shop-sku-list-item") finds all of our items on the page.div_card.find("h4", class_="sku-title").text.div_card.find("div", class_="is-sponsored") lets us know whether or not the item is sponsored.div_card.select_one("div[data-testid='customer-price']") finds our price_holder.price_holder.select_one("span[aria-hidden='true']").text extracts our price information.div_card.find("div", class_="sku-model") finds our model_holder.model _model, we go through and extract both the model_number and sku_number from it.rating_holder to extract both our rating and the link to the product.cp parameter.
We'll reformat our URL to look like this:
https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}
page_number+1 because Python's builtin range() function begins counting at 0 but our pages begin at 1. We also need to write a function that calls scrape_search_results() on multiple pages.
Here is our new function, start_scrape().
def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="shop-sku-list-item") for div_card in div_cards: sponsored = False sponsored_tag = div_card.find("div", class_="is-sponsored") if sponsored_tag: sponsored = True name = div_card.find("h4", class_="sku-title").text price_holder = div_card.select_one("div[data-testid='customer-price']") price = price_holder.select_one("span[aria-hidden='true']").text model_holder = div_card.find("div", class_="sku-model") model_info_array = model_holder.find_all("span", class_="sku-value") model_number = model_info_array[0].text sku_number = model_info_array[1].text rating_holder = div_card.find("div", class_="ratings-reviews") href = rating_holder.find("a") link = "n/a" if href: link = f"https://www.bestbuy.com{href.get('href')}" rating_text = rating_holder.find("p", class_="visually-hidden").text rating = 0.0 if rating_text != "Not Yet Reviewed": rating = rating_text.split(" ")[1] search_data = { "name": name, "url": link, "price": price, "model_number": model_number, "sku": sku_number, "rating": rating, "spoonsored": sponsored } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
start_scrape() allows us to parse a list of pages.dataclass and we're going to build a DataPipeline that saves that dataclass to a CSV file.
Here is our new dataclass. We'll call it SearchData.
@dataclassclass SearchData: name: str = "" url: str = "" price: str = "" model_number: str = "" sku: str = "" rating: float = 0.0 spoonsored: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline we pass it into. It opens a pipe to a CSV file and removes duplicates based on their name.
class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" model_number: str = "" sku: str = "" rating: float = 0.0 spoonsored: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="shop-sku-list-item") for div_card in div_cards: sponsored = False sponsored_tag = div_card.find("div", class_="is-sponsored") if sponsored_tag: sponsored = True name = div_card.find("h4", class_="sku-title").text price_holder = div_card.select_one("div[data-testid='customer-price']") price = price_holder.select_one("span[aria-hidden='true']").text model_holder = div_card.find("div", class_="sku-model") model_info_array = model_holder.find_all("span", class_="sku-value") model_number = model_info_array[0].text sku_number = model_info_array[1].text rating_holder = div_card.find("div", class_="ratings-reviews") href = rating_holder.find("a") link = "n/a" if href: link = f"https://www.bestbuy.com{href.get('href')}" rating_text = rating_holder.find("p", class_="visually-hidden").text rating = 0.0 if rating_text != "Not Yet Reviewed": rating = rating_text.split(" ")[1] search_data = SearchData( name=name, url=link, price=price, model_number=model_number, sku=sku_number, rating=rating, spoonsored=sponsored ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main, we open a DataPipeline and pass it into start_scrape().DataPipeline then gets passed into our parsing function.SearchData and pass it into the DataPipeline once it's been parsed.ThreadPoolExecutor. This opens up a new set of threads with whatever limit we choose. On each of these threads, we call a function. This function will then run simultaneously on each thread, giving us the ability to parse multiple pages at once.
To accomplish this, we're going to replace our for loop with something better inside start_scrape().
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map() holds all of our key logic here, pay attention to the arguments:
scrape_search_results is the function we want to call on each thread.scrape_search_results.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" model_number: str = "" sku: str = "" rating: float = 0.0 spoonsored: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="shop-sku-list-item") for div_card in div_cards: sponsored = False sponsored_tag = div_card.find("div", class_="is-sponsored") if sponsored_tag: sponsored = True name = div_card.find("h4", class_="sku-title").text price_holder = div_card.select_one("div[data-testid='customer-price']") price = price_holder.select_one("span[aria-hidden='true']").text model_holder = div_card.find("div", class_="sku-model") model_info_array = model_holder.find_all("span", class_="sku-value") model_number = model_info_array[0].text sku_number = model_info_array[1].text rating_holder = div_card.find("div", class_="ratings-reviews") href = rating_holder.find("a") link = "n/a" if href: link = f"https://www.bestbuy.com{href.get('href')}" rating_text = rating_holder.find("p", class_="visually-hidden").text rating = 0.0 if rating_text != "Not Yet Reviewed": rating = rating_text.split(" ")[1] search_data = SearchData( name=name, url=link, price=price, model_number=model_number, sku=sku_number, rating=rating, spoonsored=sponsored ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
get_scrapeops_url().
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
payload and builds a proxied url. Pay close attention to the payload:
"api_key": you ScrapeOps API key."url": the url you want to scrape."country": the country we wish to appear in.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" model_number: str = "" sku: str = "" rating: float = 0.0 spoonsored: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="shop-sku-list-item") for div_card in div_cards: sponsored = False sponsored_tag = div_card.find("div", class_="is-sponsored") if sponsored_tag: sponsored = True name = div_card.find("h4", class_="sku-title").text price_holder = div_card.select_one("div[data-testid='customer-price']") price = price_holder.select_one("span[aria-hidden='true']").text model_holder = div_card.find("div", class_="sku-model") model_info_array = model_holder.find_all("span", class_="sku-value") model_number = model_info_array[0].text sku_number = model_info_array[1].text rating_holder = div_card.find("div", class_="ratings-reviews") href = rating_holder.find("a") link = "n/a" if href: link = f"https://www.bestbuy.com{href.get('href')}" rating_text = rating_holder.find("p", class_="visually-hidden").text rating = 0.0 if rating_text != "Not Yet Reviewed": rating = rating_text.split(" ")[1] search_data = SearchData( name=name, url=link, price=price, model_number=model_number, sku=sku_number, rating=rating, spoonsored=sponsored ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main in the snippet below.
if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
MAX_THREADS: Controls the number of threads that the program will use for concurrent execution.MAX_RETRIES: Defines the number of times the scraper will retry a failed request before giving up.PAGES: Determines how many pages of Google search results to scrape for each keyword.LOCATION: Specifies the geographical location (country) for the Google search.keyword_list: This is a list of keywords for which the script will perform the search and subsequent scraping.def process_item(row, location, retries=3): url = row["url"] if url == "n/a": return tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.find_all("li", class_="review-item-simple") for review_card in review_cards: rating_holder = review_card.find("div", class_="review-rating") rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1]) name = review_card.find("h4").text incentivized = False incentivized_button = review_card.select_one("button[title='badge for Incentivized']") if incentivized_button: incentivized = True verified = False verified_button = review_card.select_one("button[title='badge for Verified Purchaser']") if verified_button: incentivized = True review_data = { "name": name, "rating": rating, "incentivized": incentivized, "verified": verified } print(review_data) success = True else: raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
review_card, we follow these steps to exract our data.
review_card.find("div", class_="review-rating") finds the rating_holder.float(rating_holder.find("p", class_="visually-hidden").text.split()[1]) gives our rating.review_card.find("h4").text gives our name.review_card.select_one("button[title='badge for Incentivized']") tells us whether or not the review was incentivized.review_card.select_one("button[title='badge for Verified Purchaser']") to determine whether the purchase was verified.dict objects. Then it passes each of those objects into process_item().
Here is process_results().
def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries)
process_item() and add the whole thing to our code, it looks like this.
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" model_number: str = "" sku: str = "" rating: float = 0.0 spoonsored: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="shop-sku-list-item") for div_card in div_cards: sponsored = False sponsored_tag = div_card.find("div", class_="is-sponsored") if sponsored_tag: sponsored = True name = div_card.find("h4", class_="sku-title").text price_holder = div_card.select_one("div[data-testid='customer-price']") price = price_holder.select_one("span[aria-hidden='true']").text model_holder = div_card.find("div", class_="sku-model") model_info_array = model_holder.find_all("span", class_="sku-value") model_number = model_info_array[0].text sku_number = model_info_array[1].text rating_holder = div_card.find("div", class_="ratings-reviews") href = rating_holder.find("a") link = "n/a" if href: link = f"https://www.bestbuy.com{href.get('href')}" rating_text = rating_holder.find("p", class_="visually-hidden").text rating = 0.0 if rating_text != "Not Yet Reviewed": rating = rating_text.split(" ")[1] search_data = SearchData( name=name, url=link, price=price, model_number=model_number, sku=sku_number, rating=rating, spoonsored=sponsored ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] if url == "n/a": return tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.find_all("li", class_="review-item-simple") for review_card in review_cards: rating_holder = review_card.find("div", class_="review-rating") rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1]) name = review_card.find("h4").text incentivized = False incentivized_button = review_card.select_one("button[title='badge for Incentivized']") if incentivized_button: incentivized = True verified = False verified_button = review_card.select_one("button[title='badge for Verified Purchaser']") if verified_button: incentivized = True review_data = { "name": name, "rating": rating, "incentivized": incentivized, "verified": verified } print(review_data) success = True else: raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
dataclass. Since this one is used to represent reviews, we'll call this one ReviewData. It's very similar to SearchData.
@dataclassclass ReviewData: name: str = "" rating: float = 0.0 incentivized: bool = False verified: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline within our parsing function. We then pass ReviewData into the pipeline.
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" model_number: str = "" sku: str = "" rating: float = 0.0 spoonsored: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" rating: float = 0.0 incentivized: bool = False verified: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="shop-sku-list-item") for div_card in div_cards: sponsored = False sponsored_tag = div_card.find("div", class_="is-sponsored") if sponsored_tag: sponsored = True name = div_card.find("h4", class_="sku-title").text price_holder = div_card.select_one("div[data-testid='customer-price']") price = price_holder.select_one("span[aria-hidden='true']").text model_holder = div_card.find("div", class_="sku-model") model_info_array = model_holder.find_all("span", class_="sku-value") model_number = model_info_array[0].text sku_number = model_info_array[1].text rating_holder = div_card.find("div", class_="ratings-reviews") href = rating_holder.find("a") link = "n/a" if href: link = f"https://www.bestbuy.com{href.get('href')}" rating_text = rating_holder.find("p", class_="visually-hidden").text rating = 0.0 if rating_text != "Not Yet Reviewed": rating = rating_text.split(" ")[1] search_data = SearchData( name=name, url=link, price=price, model_number=model_number, sku=sku_number, rating=rating, spoonsored=sponsored ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] if url == "n/a": return tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.find_all("li", class_="review-item-simple") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review_card in review_cards: rating_holder = review_card.find("div", class_="review-rating") rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1]) name = review_card.find("h4").text incentivized = False incentivized_button = review_card.select_one("button[title='badge for Incentivized']") if incentivized_button: incentivized = True verified = False verified_button = review_card.select_one("button[title='badge for Verified Purchaser']") if verified_button: incentivized = True review_data = ReviewData( name=name, rating=rating, incentivized=incentivized, verified=verified ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline inside of our parsing function.ReviewData into the pipeline as it gets parsed.for loop from process_results() is going to get replaced by a call to ThreadPoolExecutor.
def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) )
process_item is the function we want to call on each thread.reader is the array of search result items we want to lookup and parse.process_item().
response = requests.get(get_scrapeops_url(url, location=location))import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" model_number: str = "" sku: str = "" rating: float = 0.0 spoonsored: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" rating: float = 0.0 incentivized: bool = False verified: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="shop-sku-list-item") for div_card in div_cards: sponsored = False sponsored_tag = div_card.find("div", class_="is-sponsored") if sponsored_tag: sponsored = True name = div_card.find("h4", class_="sku-title").text price_holder = div_card.select_one("div[data-testid='customer-price']") price = price_holder.select_one("span[aria-hidden='true']").text model_holder = div_card.find("div", class_="sku-model") model_info_array = model_holder.find_all("span", class_="sku-value") model_number = model_info_array[0].text sku_number = model_info_array[1].text rating_holder = div_card.find("div", class_="ratings-reviews") href = rating_holder.find("a") link = "n/a" if href: link = f"https://www.bestbuy.com{href.get('href')}" rating_text = rating_holder.find("p", class_="visually-hidden").text rating = 0.0 if rating_text != "Not Yet Reviewed": rating = rating_text.split(" ")[1] search_data = SearchData( name=name, url=link, price=price, model_number=model_number, sku=sku_number, rating=rating, spoonsored=sponsored ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] if url == "n/a": return tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.find_all("li", class_="review-item-simple") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review_card in review_cards: rating_holder = review_card.find("div", class_="review-rating") rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1]) name = review_card.find("h4").text incentivized = False incentivized_button = review_card.select_one("button[title='badge for Incentivized']") if incentivized_button: incentivized = True verified = False verified_button = review_card.select_one("button[title='badge for Verified Purchaser']") if verified_button: incentivized = True review_data = ReviewData( name=name, rating=rating, incentivized=incentivized, verified=verified ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main.
if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt. Violating these policies can result in suspension or even a permanent ban.