Skip to main content

Scrape BestBuy With Python Requests and BeautifulSoup

How to Scrape BestBuy With Requests and BeautifulSoup

BestBuy has been around since 1966 and was originally named Sound of Music. It's been a huge player in the electronics market and has been for decades. Similar to Amazon or WalMart, we can scrape tons of product information from BestBuy.

Today, we'll be scraping GPUs from BestBuy and we'll get all sorts of useful information. After scraping the GPUs, we'll go through and scrape reviews for each GPU.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape BestBuy

If you need a scraper but don't have time to read, look no further!

  1. Create a new project folder.
  2. Inside your new project, add a config.json file.
  3. Inside your config file, add your ScrapeOps API key, {"api_key": "your-super-secret-api-key"}.
  4. Afterward, copy/paste the code below into a new Python file and you're all set to run it with python name_of_your_script.py!
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
rating: float = 0.0
incentivized: bool = False
verified: bool = False

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")

for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True

name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"

rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]

search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
if url == "n/a":
return
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("li", class_="review-item-simple")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

for review_card in review_cards:
rating_holder = review_card.find("div", class_="review-rating")
rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1])
name = review_card.find("h4").text

incentivized = False
incentivized_button = review_card.select_one("button[title='badge for Incentivized']")
if incentivized_button:
incentivized = True

verified = False
verified_button = review_card.select_one("button[title='badge for Verified Purchaser']")
if verified_button:
incentivized = True

review_data = ReviewData(
name=name,
rating=rating,
incentivized=incentivized,
verified=verified
)
review_pipeline.add_data(review_data)

review_pipeline.close_pipeline()
success = True

else:
raise Exception(f"Failed Request, status code: {response.status_code}")

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

To change your results, feel free to change any of the following:

  • MAX_THREADS: Controls the number of threads that the program will use for concurrent execution.
  • MAX_RETRIES: Defines the number of times the scraper will retry a failed request before giving up.
  • PAGES: Determines how many pages of Google search results to scrape for each keyword.
  • LOCATION: Specifies the geographical location (country) for the Google search.
  • keyword_list: This is a list of keywords for which the script will perform the search and subsequent scraping.

How To Architect Our BestBuy Scraper

To scrape BestBuy effectively, we need to write both a result crawler and a review scraper.

  1. Our result crawler will perform a keyword search and save all the results from the search.
  2. Then, our review scraper is going to read the CSV file from the crawl and scrape reviews for each item.

The result crawler will be built in the following steps:

  1. Building a search parser.
  2. Add pagination to our results.
  3. Store our data in a CSV file.
  4. Add concurrency to crawl multiple pages simultaneously.
  5. Integrate with the ScrapeOps Proxy Aggregator to bypass anti-bots.

We'll run through these next steps when building our review scraper.

  1. Building a review parser.
  2. Read the data from our CSV file.
  3. Store the extracted data in a CSV file.
  4. Adding concurrency to scrape multiple products at once.
  5. Once again, use proxy integration to get past any anti-bots.

Understanding How To Scrape BestBuy

Before scraping our site, we need a solid understanding of exactly how to access the website and how to extract the data from it. In the coming sections, we'll go through:

  • How to GET BestBuy Pages
  • How to Extract Data from BestBuy Pages
  • How to Control Our Pagination
  • How to Control Our Geolocation

Step 1: How To Request BestBuy Pages

We'll start with how to request webpages from BestBuy. Everything begins with a GET request.

  • When we GET a page, the server sends our response back as HTML.
  • The key difference between Python Requests and our browser: the browser reads the HTML and renders the webpage for us to see.
  • With Python Requests, instead of reading and rendering the page, we need to code our scraper to dig through the HTML and extract information.

Take a look at the URL for page 1 of our search results:

https://www.bestbuy.com/site/searchpage.jsp?cp=1&st=gpu

Our base URL is:

https://www.bestbuy.com/site/searchpage.jsp

Our query string begins with ? and each query param is separated by &. The one we need to pay attention to here is st=gpu. st represents our search term, in this case, gpu.

Search Results

Now, let's take a look at BestBuy's individual product pages. Here is an example URL: https://www.bestbuy.com/site/asus-tuf-gaming-nvidia-geforce-rtx-4080-super-overclock-16gb-gddr6x-pci-express-4-0-graphics-card-black/6574587.p?skuId=6574587.

As you can see in our URL, it gets laid out like this:

https://www.bestbuy.com/site/{PRODUCT_NAME}/{SKU_NUMBER}.p?skuId={SKU_NUMBER}

If you scroll down the page, you'll get to the reviews section.

Review Results


Step 2: How To Extract Data From BestBuy Results and Pages

Let's inspect the pages we just looked and and see how to extract our data. On both pages, the data is nested inside the HTML of the page. When we inspect, we'll see exactly where this data is located.

On the search results page, each product holds information inside a div with a class of shop-sku-list-item. When we find this item, we can pick through it and find all of our relevant data.

Search Page HTML Inspection

On the product page, we follow a similar structure with reviews. Each review is held in an li element with a class of review-item-simple.

Reviews HTML Inspection


Step 3: How To Control Pagination

Pagination is hard to find, but if you know where to look, it's actually pretty simple to control. Think back to our URL from earlier:

https://www.bestbuy.com/site/searchpage.jsp?cp=1&st=gpu

There is one other param in our query string, cp=1. cp represents our page number.


Step 4: Geolocated Data

To handle geolocation, we're going to take full advantage of the ScrapeOps Proxy API. When we talk to the ScrapeOps server, we can pass a country parameter, and ScrapeOps will route us through the country of our choice.

  • If we want to show up in the US, we pass "country": "us".
  • If we want to appear in the UK, we can pass "country": "uk".

You can view a list of our supported locations here.


Setting Up Our BestBuy Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir bestbuy-scraper

cd bestbuy-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A BestBuy Search Crawler

As previously mentioned, we'll build our crawler first. Our crawler needs to perform a search and parse the search results. After parsing these results, it needs to save them to a CSV file.

A good parser, should parse multiple pages with concurrency and bypass anti-bots using a proxy.

Our crawler should execute the following steps:

  1. Parse a search results page.
  2. Paginate our search results.
  3. Store our data.
  4. Concurrently run steps 1 through 3.
  5. Integrate with a proxy to prevent us from getting blocked.

Step 1: Create Simple Search Data Parser

We'll get started with a simple parsing function. In this version of our code, we'll add error handling, retry logic and the parsing function we just mentioned.

Pay special attention to the parsing logic laid out in scrape_search_results().

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?st={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")

for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True

name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"

rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]

search_data = {
"name": name,
"url": link,
"price": price,
"model_number": model_number,
"sku": sku_number,
"rating": rating,
"spoonsored": sponsored
}
print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

Here are the key takeaways from scrape_search_results():

  • soup.find_all("div", class_="shop-sku-list-item") finds all of our items on the page.
  • We get our item name with div_card.find("h4", class_="sku-title").text.
  • div_card.find("div", class_="is-sponsored") lets us know whether or not the item is sponsored.
  • div_card.select_one("div[data-testid='customer-price']") finds our price_holder.
  • price_holder.select_one("span[aria-hidden='true']").text extracts our price information.
  • div_card.find("div", class_="sku-model") finds our model_holder.
  • After finding the model _model, we go through and extract both the model_number and sku_number from it.
  • We then find the rating_holder to extract both our rating and the link to the product.

Step 2: Add Pagination

Adding pagination is quite simple. If you remember from earlier, our page number is denoted by the cp parameter.

We'll reformat our URL to look like this:

https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}

We use page_number+1 because Python's builtin range() function begins counting at 0 but our pages begin at 1. We also need to write a function that calls scrape_search_results() on multiple pages.

Here is our new function, start_scrape().

def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)

Our full code now looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")

for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True

name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"

rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]

search_data = {
"name": name,
"url": link,
"price": price,
"model_number": model_number,
"sku": sku_number,
"rating": rating,
"spoonsored": sponsored
}
print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:

start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")
  • Our urls are now formatted to support pagination.
  • start_scrape() allows us to parse a list of pages.

Step 3: Storing the Scraped Data

The whole purpose of scraping is to store the data. We store the data from our crawl to create a readable CSV report. To accomplish this, we're going to create a new dataclass and we're going to build a DataPipeline that saves that dataclass to a CSV file.

Here is our new dataclass. We'll call it SearchData.

@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is the DataPipeline we pass it into. It opens a pipe to a CSV file and removes duplicates based on their name.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

Once we put everything together, it looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")

for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True

name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"

rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]

search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • Inside of our main, we open a DataPipeline and pass it into start_scrape().
  • The DataPipeline then gets passed into our parsing function.
  • Inside the parsing function, we turn our extracted data into SearchData and pass it into the DataPipeline once it's been parsed.
  • When the crawl is complete, we close the pipeline.

Step 4: Adding Concurrency

To add concurrency, we're going to use ThreadPoolExecutor. This opens up a new set of threads with whatever limit we choose. On each of these threads, we call a function. This function will then run simultaneously on each thread, giving us the ability to parse multiple pages at once.

To accomplish this, we're going to replace our for loop with something better inside start_scrape().

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

executor.map() holds all of our key logic here, pay attention to the arguments:

  • scrape_search_results is the function we want to call on each thread.
  • All other arguments are passed in as arrays which then in turn get passed into scrape_search_results.

Here is our full code up to this point.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")

for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True

name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"

rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]

search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 5: Bypassing Anti-Bots

Anti-bot software is designed to catch malicious software and block it from accessing the site. Our scraper isn't malicious, but it is a bot.

To get past anti-bots, we're going to use the ScrapeOps Proxy Aggregator API.

Take a look at our proxy function, get_scrapeops_url().

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

This function takes in a payload and builds a proxied url. Pay close attention to the payload:

  • "api_key": you ScrapeOps API key.
  • "url": the url you want to scrape.
  • "country": the country we wish to appear in.

Here is our production ready crawler.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")

for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True

name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"

rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]

search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Time to test it out! We're going to crawl 5 pages on 5 threads. You can view our updated main in the snippet below.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Feel free to change any of the following to tweak your results:

  • MAX_THREADS: Controls the number of threads that the program will use for concurrent execution.
  • MAX_RETRIES: Defines the number of times the scraper will retry a failed request before giving up.
  • PAGES: Determines how many pages of Google search results to scrape for each keyword.
  • LOCATION: Specifies the geographical location (country) for the Google search.
  • keyword_list: This is a list of keywords for which the script will perform the search and subsequent scraping.

Here are our results.

Crawler Results Terminal

We crawled 5 pages of GPUs in 29.862 seconds. 29.862 / 5 = 5.972 seconds per page.


Build A BestBuy Scraper

Now that our crawler is giving us a report, we're going to build a review scraper. This scraper is going to perform the following actions:

  1. Read the CSV file.
  2. Parse review data from each row in the CSV.
  3. Store our parsed review data.
  4. Use concurrency to run steps 2 and 3 on multiple items at the same time.
  5. Integrate with the ScrapeOps API in order to bypass anti-bots and avoid anything else that might block us.

Step 1: Create Simple Product Data Parser

Just like we did earlier, we're going to start by building a simple data parser. This should feel quite familiar. Once again, pay attention to the parsing logic.

def process_item(row, location, retries=3):
url = row["url"]
if url == "n/a":
return
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("li", class_="review-item-simple")

for review_card in review_cards:
rating_holder = review_card.find("div", class_="review-rating")
rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1])
name = review_card.find("h4").text

incentivized = False
incentivized_button = review_card.select_one("button[title='badge for Incentivized']")
if incentivized_button:
incentivized = True

verified = False
verified_button = review_card.select_one("button[title='badge for Verified Purchaser']")
if verified_button:
incentivized = True

review_data = {
"name": name,
"rating": rating,
"incentivized": incentivized,
"verified": verified
}
print(review_data)

success = True

else:
raise Exception(f"Failed Request, status code: {response.status_code}")

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")

In each review_card, we follow these steps to exract our data.

  • review_card.find("div", class_="review-rating") finds the rating_holder.
  • float(rating_holder.find("p", class_="visually-hidden").text.split()[1]) gives our rating.
  • review_card.find("h4").text gives our name.
  • review_card.select_one("button[title='badge for Incentivized']") tells us whether or not the review was incentivized.
  • We use review_card.select_one("button[title='badge for Verified Purchaser']") to determine whether the purchase was verified.

Step 2: Loading URLs To Scrape

In order to use our parsing function, we need to give it a url. Here, we'll write a function that reads our CSV file into an array of dict objects. Then it passes each of those objects into process_item().

Here is process_results().

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_item(row, location, retries=retries)

When you combine it with process_item() and add the whole thing to our code, it looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")

for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True

name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"

rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]

search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
if url == "n/a":
return
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("li", class_="review-item-simple")

for review_card in review_cards:
rating_holder = review_card.find("div", class_="review-rating")
rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1])
name = review_card.find("h4").text

incentivized = False
incentivized_button = review_card.select_one("button[title='badge for Incentivized']")
if incentivized_button:
incentivized = True

verified = False
verified_button = review_card.select_one("button[title='badge for Verified Purchaser']")
if verified_button:
incentivized = True

review_data = {
"name": name,
"rating": rating,
"incentivized": incentivized,
"verified": verified
}
print(review_data)

success = True

else:
raise Exception(f"Failed Request, status code: {response.status_code}")

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_item(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

Step 3: Storing the Scraped Data

We've already got almost everything we need to store our data, we just need one more dataclass. Since this one is used to represent reviews, we'll call this one ReviewData. It's very similar to SearchData.

@dataclass
class ReviewData:
name: str = ""
rating: float = 0.0
incentivized: bool = False
verified: bool = False

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In the code below, we open a new DataPipeline within our parsing function. We then pass ReviewData into the pipeline.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
rating: float = 0.0
incentivized: bool = False
verified: bool = False

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")

for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True

name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"

rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]

search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
if url == "n/a":
return
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("li", class_="review-item-simple")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

for review_card in review_cards:
rating_holder = review_card.find("div", class_="review-rating")
rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1])
name = review_card.find("h4").text

incentivized = False
incentivized_button = review_card.select_one("button[title='badge for Incentivized']")
if incentivized_button:
incentivized = True

verified = False
verified_button = review_card.select_one("button[title='badge for Verified Purchaser']")
if verified_button:
incentivized = True

review_data = ReviewData(
name=name,
rating=rating,
incentivized=incentivized,
verified=verified
)
review_pipeline.add_data(review_data)

review_pipeline.close_pipeline()
success = True

else:
raise Exception(f"Failed Request, status code: {response.status_code}")

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_item(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
  • We open a new DataPipeline inside of our parsing function.
  • We pass ReviewData into the pipeline as it gets parsed.

Step 4: Adding Concurrency

Concurrency will get added exactly the way it did before. The for loop from process_results() is going to get replaced by a call to ThreadPoolExecutor.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)

This time:

  • process_item is the function we want to call on each thread.
  • reader is the array of search result items we want to lookup and parse.
  • All other args get passed in as arrays as well.

Step 5: Bypassing Anti-Bots

To get past anti-bots, we already discussed what we need to do. We also already wrote a proxy function. We just need to use it in the right place. We're going to change a single line from process_item().

response = requests.get(get_scrapeops_url(url, location=location))

Here is our code and it's fully ready to run in production.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
rating: float = 0.0
incentivized: bool = False
verified: bool = False

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")

for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True

name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"

rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]

search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
if url == "n/a":
return
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("li", class_="review-item-simple")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

for review_card in review_cards:
rating_holder = review_card.find("div", class_="review-rating")
rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1])
name = review_card.find("h4").text

incentivized = False
incentivized_button = review_card.select_one("button[title='badge for Incentivized']")
if incentivized_button:
incentivized = True

verified = False
verified_button = review_card.select_one("button[title='badge for Verified Purchaser']")
if verified_button:
incentivized = True

review_data = ReviewData(
name=name,
rating=rating,
incentivized=incentivized,
verified=verified
)
review_pipeline.add_data(review_data)

review_pipeline.close_pipeline()
success = True

else:
raise Exception(f"Failed Request, status code: {response.status_code}")

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Now, we're going to run the same crawl we ran earlier, except this time, we're going to scrape reviews for each gpu that we find during the crawl. As before, feel free to change any of the constants to tweak your own results.

Here is our updated main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Here are our results.

Scraper Performance Terminal

The full run took 231.985 seconds. If you remember our crawl, it took 29.862 seconds.

This crawl generated a report with 72 results. 231.985 - 29.862 = 202.123 seconds spent scraping. 202.123 seconds / 72 pages = 2.807 pages per second.

This is extremely fast... twice as fast as our initial crawl (which was decent to begin with).


Scraping the web is generally considered legal as long as you're scraping public data. Public data is any data not gated behind a login.

Private data is a completely different story. With private data, you're subject to an entirely different set of rules and regulations. If you're not sure about your scraper, consult an attorney.

Along with the legality of your scrape, you also need to pay attention to the site's own rules. Particularly, you need to pay attention to their Terms and Conditions and the robots.txt. Violating these policies can result in suspension or even a permanent ban.


Conclusion

You now know how to scrape BestBuy. You got a crash course in iterative building and you know how to pull nested data out of an HTML page. To get a better understanding of the tech stack used in this article, take a look at the links below.


More Python Web Scraping Guides

If you enjoyed this article, check out some other ones from the Python Web Scraping Playbook. We've got learning resources for developers of all experience levels. If you'd like to learn more from our "How To Scrape" series, take a look at the links below.