How to Scrape BestBuy With Requests and BeautifulSoup
BestBuy has been around since 1966 and was originally named Sound of Music. It's been a huge player in the electronics market and has been for decades. Similar to Amazon or WalMart, we can scrape tons of product information from BestBuy.
Today, we'll be scraping GPUs from BestBuy and we'll get all sorts of useful information. After scraping the GPUs, we'll go through and scrape reviews for each GPU.
- TLDR: How to Scrape BestBuy
- How To Architect Our Scraper
- Understanding How To Scrape BestBuy
- Setting Up Our BestBuy Scraper
- Build A BestBuy Search Crawler
- Build A BestBuy Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape BestBuy
If you need a scraper but don't have time to read, look no further!
- Create a new project folder.
- Inside your new project, add a
config.json
file. - Inside your config file, add your ScrapeOps API key,
{"api_key": "your-super-secret-api-key"}
. - Afterward, copy/paste the code below into a new Python file and you're all set to run it with
python name_of_your_script.py
!
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
rating: float = 0.0
incentivized: bool = False
verified: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")
for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True
name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"
rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]
search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
if url == "n/a":
return
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("li", class_="review-item-simple")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review_card in review_cards:
rating_holder = review_card.find("div", class_="review-rating")
rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1])
name = review_card.find("h4").text
incentivized = False
incentivized_button = review_card.select_one("button[title='badge for Incentivized']")
if incentivized_button:
incentivized = True
verified = False
verified_button = review_card.select_one("button[title='badge for Verified Purchaser']")
if verified_button:
incentivized = True
review_data = ReviewData(
name=name,
rating=rating,
incentivized=incentivized,
verified=verified
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
To change your results, feel free to change any of the following:
MAX_THREADS
: Controls the number of threads that the program will use for concurrent execution.MAX_RETRIES
: Defines the number of times the scraper will retry a failed request before giving up.PAGES
: Determines how many pages of Google search results to scrape for each keyword.LOCATION
: Specifies the geographical location (country) for the Google search.keyword_list
: This is a list of keywords for which the script will perform the search and subsequent scraping.
How To Architect Our BestBuy Scraper
To scrape BestBuy effectively, we need to write both a result crawler and a review scraper.
- Our result crawler will perform a keyword search and save all the results from the search.
- Then, our review scraper is going to read the CSV file from the crawl and scrape reviews for each item.
The result crawler will be built in the following steps:
- Building a search parser.
- Add pagination to our results.
- Store our data in a CSV file.
- Add concurrency to crawl multiple pages simultaneously.
- Integrate with the ScrapeOps Proxy Aggregator to bypass anti-bots.
We'll run through these next steps when building our review scraper.
- Building a review parser.
- Read the data from our CSV file.
- Store the extracted data in a CSV file.
- Adding concurrency to scrape multiple products at once.
- Once again, use proxy integration to get past any anti-bots.
Understanding How To Scrape BestBuy
Before scraping our site, we need a solid understanding of exactly how to access the website and how to extract the data from it. In the coming sections, we'll go through:
- How to GET BestBuy Pages
- How to Extract Data from BestBuy Pages
- How to Control Our Pagination
- How to Control Our Geolocation
Step 1: How To Request BestBuy Pages
We'll start with how to request webpages from BestBuy. Everything begins with a GET request.
- When we GET a page, the server sends our response back as HTML.
- The key difference between Python Requests and our browser: the browser reads the HTML and renders the webpage for us to see.
- With Python Requests, instead of reading and rendering the page, we need to code our scraper to dig through the HTML and extract information.
Take a look at the URL for page 1 of our search results:
https://www.bestbuy.com/site/searchpage.jsp?cp=1&st=gpu
Our base URL is:
https://www.bestbuy.com/site/searchpage.jsp
Our query string begins with ?
and each query param is separated by &
. The one we need to pay attention to here is st=gpu
. st
represents our search term, in this case, gpu
.
Now, let's take a look at BestBuy's individual product pages. Here is an example URL: https://www.bestbuy.com/site/asus-tuf-gaming-nvidia-geforce-rtx-4080-super-overclock-16gb-gddr6x-pci-express-4-0-graphics-card-black/6574587.p?skuId=6574587.
As you can see in our URL, it gets laid out like this:
https://www.bestbuy.com/site/{PRODUCT_NAME}/{SKU_NUMBER}.p?skuId={SKU_NUMBER}
If you scroll down the page, you'll get to the reviews section.
Step 2: How To Extract Data From BestBuy Results and Pages
Let's inspect the pages we just looked and and see how to extract our data. On both pages, the data is nested inside the HTML of the page. When we inspect, we'll see exactly where this data is located.
On the search results page, each product holds information inside a div
with a class of shop-sku-list-item
. When we find this item, we can pick through it and find all of our relevant data.
On the product page, we follow a similar structure with reviews. Each review is held in an li
element with a class of review-item-simple
.
Step 3: How To Control Pagination
Pagination is hard to find, but if you know where to look, it's actually pretty simple to control. Think back to our URL from earlier:
https://www.bestbuy.com/site/searchpage.jsp?cp=1&st=gpu
There is one other param in our query string, cp=1
. cp
represents our page number.
Step 4: Geolocated Data
To handle geolocation, we're going to take full advantage of the ScrapeOps Proxy API. When we talk to the ScrapeOps server, we can pass a country
parameter, and ScrapeOps will route us through the country of our choice.
- If we want to show up in the US, we pass
"country": "us"
. - If we want to appear in the UK, we can pass
"country": "uk"
.
You can view a list of our supported locations here.
Setting Up Our BestBuy Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir bestbuy-scraper
cd bestbuy-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A BestBuy Search Crawler
As previously mentioned, we'll build our crawler first. Our crawler needs to perform a search and parse the search results. After parsing these results, it needs to save them to a CSV file.
A good parser, should parse multiple pages with concurrency and bypass anti-bots using a proxy.
Our crawler should execute the following steps:
- Parse a search results page.
- Paginate our search results.
- Store our data.
- Concurrently run steps 1 through 3.
- Integrate with a proxy to prevent us from getting blocked.
Step 1: Create Simple Search Data Parser
We'll get started with a simple parsing function. In this version of our code, we'll add error handling, retry logic and the parsing function we just mentioned.
Pay special attention to the parsing logic laid out in scrape_search_results()
.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?st={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")
for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True
name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"
rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]
search_data = {
"name": name,
"url": link,
"price": price,
"model_number": model_number,
"sku": sku_number,
"rating": rating,
"spoonsored": sponsored
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
Here are the key takeaways from scrape_search_results()
:
soup.find_all("div", class_="shop-sku-list-item")
finds all of our items on the page.- We get our item name with
div_card.find("h4", class_="sku-title").text
. div_card.find("div", class_="is-sponsored")
lets us know whether or not the item is sponsored.div_card.select_one("div[data-testid='customer-price']")
finds ourprice_holder
.price_holder.select_one("span[aria-hidden='true']").text
extracts our price information.div_card.find("div", class_="sku-model")
finds ourmodel_holder
.- After finding the
model _model
, we go through and extract both themodel_number
andsku_number
from it. - We then find the
rating_holder
to extract both our rating and thelink
to the product.
Step 2: Add Pagination
Adding pagination is quite simple. If you remember from earlier, our page number is denoted by the cp
parameter.
We'll reformat our URL to look like this:
https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}
We use page_number+1
because Python's builtin range()
function begins counting at 0 but our pages begin at 1. We also need to write a function that calls scrape_search_results()
on multiple pages.
Here is our new function, start_scrape()
.
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
Our full code now looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")
for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True
name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"
rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]
search_data = {
"name": name,
"url": link,
"price": price,
"model_number": model_number,
"sku": sku_number,
"rating": rating,
"spoonsored": sponsored
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
- Our urls are now formatted to support pagination.
start_scrape()
allows us to parse a list of pages.
Step 3: Storing the Scraped Data
The whole purpose of scraping is to store the data. We store the data from our crawl to create a readable CSV report. To accomplish this, we're going to create a new dataclass
and we're going to build a DataPipeline
that saves that dataclass
to a CSV file.
Here is our new dataclass
. We'll call it SearchData
.
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is the DataPipeline
we pass it into. It opens a pipe to a CSV file and removes duplicates based on their name
.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
Once we put everything together, it looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")
for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True
name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"
rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]
search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- Inside of our
main
, we open aDataPipeline
and pass it intostart_scrape()
. - The
DataPipeline
then gets passed into our parsing function. - Inside the parsing function, we turn our extracted data into
SearchData
and pass it into theDataPipeline
once it's been parsed. - When the crawl is complete, we close the pipeline.
Step 4: Adding Concurrency
To add concurrency, we're going to use ThreadPoolExecutor
. This opens up a new set of threads with whatever limit we choose. On each of these threads, we call a function. This function will then run simultaneously on each thread, giving us the ability to parse multiple pages at once.
To accomplish this, we're going to replace our for
loop with something better inside start_scrape()
.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
executor.map()
holds all of our key logic here, pay attention to the arguments:
scrape_search_results
is the function we want to call on each thread.- All other arguments are passed in as arrays which then in turn get passed into
scrape_search_results
.
Here is our full code up to this point.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")
for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True
name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"
rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]
search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 5: Bypassing Anti-Bots
Anti-bot software is designed to catch malicious software and block it from accessing the site. Our scraper isn't malicious, but it is a bot.
To get past anti-bots, we're going to use the ScrapeOps Proxy Aggregator API.
Take a look at our proxy function, get_scrapeops_url()
.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
This function takes in a payload
and builds a proxied url. Pay close attention to the payload
:
"api_key"
: you ScrapeOps API key."url"
: the url you want to scrape."country"
: the country we wish to appear in.
Here is our production ready crawler.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")
for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True
name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"
rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]
search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Time to test it out! We're going to crawl 5 pages on 5 threads. You can view our updated main
in the snippet below.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Feel free to change any of the following to tweak your results:
MAX_THREADS
: Controls the number of threads that the program will use for concurrent execution.MAX_RETRIES
: Defines the number of times the scraper will retry a failed request before giving up.PAGES
: Determines how many pages of Google search results to scrape for each keyword.LOCATION
: Specifies the geographical location (country) for the Google search.keyword_list
: This is a list of keywords for which the script will perform the search and subsequent scraping.
Here are our results.
We crawled 5 pages of GPUs in 29.862 seconds. 29.862 / 5 = 5.972 seconds per page.
Build A BestBuy Scraper
Now that our crawler is giving us a report, we're going to build a review scraper. This scraper is going to perform the following actions:
- Read the CSV file.
- Parse review data from each row in the CSV.
- Store our parsed review data.
- Use concurrency to run steps 2 and 3 on multiple items at the same time.
- Integrate with the ScrapeOps API in order to bypass anti-bots and avoid anything else that might block us.
Step 1: Create Simple Product Data Parser
Just like we did earlier, we're going to start by building a simple data parser. This should feel quite familiar. Once again, pay attention to the parsing logic.
def process_item(row, location, retries=3):
url = row["url"]
if url == "n/a":
return
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("li", class_="review-item-simple")
for review_card in review_cards:
rating_holder = review_card.find("div", class_="review-rating")
rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1])
name = review_card.find("h4").text
incentivized = False
incentivized_button = review_card.select_one("button[title='badge for Incentivized']")
if incentivized_button:
incentivized = True
verified = False
verified_button = review_card.select_one("button[title='badge for Verified Purchaser']")
if verified_button:
incentivized = True
review_data = {
"name": name,
"rating": rating,
"incentivized": incentivized,
"verified": verified
}
print(review_data)
success = True
else:
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
In each review_card
, we follow these steps to exract our data.
review_card.find("div", class_="review-rating")
finds therating_holder
.float(rating_holder.find("p", class_="visually-hidden").text.split()[1])
gives ourrating
.review_card.find("h4").text
gives ourname
.review_card.select_one("button[title='badge for Incentivized']")
tells us whether or not the review was incentivized.- We use
review_card.select_one("button[title='badge for Verified Purchaser']")
to determine whether the purchase was verified.
Step 2: Loading URLs To Scrape
In order to use our parsing function, we need to give it a url. Here, we'll write a function that reads our CSV file into an array of dict
objects. Then it passes each of those objects into process_item()
.
Here is process_results()
.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
When you combine it with process_item()
and add the whole thing to our code, it looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")
for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True
name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"
rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]
search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
if url == "n/a":
return
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("li", class_="review-item-simple")
for review_card in review_cards:
rating_holder = review_card.find("div", class_="review-rating")
rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1])
name = review_card.find("h4").text
incentivized = False
incentivized_button = review_card.select_one("button[title='badge for Incentivized']")
if incentivized_button:
incentivized = True
verified = False
verified_button = review_card.select_one("button[title='badge for Verified Purchaser']")
if verified_button:
incentivized = True
review_data = {
"name": name,
"rating": rating,
"incentivized": incentivized,
"verified": verified
}
print(review_data)
success = True
else:
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
We've already got almost everything we need to store our data, we just need one more dataclass
. Since this one is used to represent reviews, we'll call this one ReviewData
. It's very similar to SearchData
.
@dataclass
class ReviewData:
name: str = ""
rating: float = 0.0
incentivized: bool = False
verified: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In the code below, we open a new DataPipeline
within our parsing function. We then pass ReviewData
into the pipeline.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
rating: float = 0.0
incentivized: bool = False
verified: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")
for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True
name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"
rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]
search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
if url == "n/a":
return
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("li", class_="review-item-simple")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review_card in review_cards:
rating_holder = review_card.find("div", class_="review-rating")
rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1])
name = review_card.find("h4").text
incentivized = False
incentivized_button = review_card.select_one("button[title='badge for Incentivized']")
if incentivized_button:
incentivized = True
verified = False
verified_button = review_card.select_one("button[title='badge for Verified Purchaser']")
if verified_button:
incentivized = True
review_data = ReviewData(
name=name,
rating=rating,
incentivized=incentivized,
verified=verified
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
- We open a new
DataPipeline
inside of our parsing function. - We pass
ReviewData
into the pipeline as it gets parsed.
Step 4: Adding Concurrency
Concurrency will get added exactly the way it did before. The for
loop from process_results()
is going to get replaced by a call to ThreadPoolExecutor
.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
This time:
process_item
is the function we want to call on each thread.reader
is the array of search result items we want to lookup and parse.- All other args get passed in as arrays as well.
Step 5: Bypassing Anti-Bots
To get past anti-bots, we already discussed what we need to do. We also already wrote a proxy function. We just need to use it in the right place. We're going to change a single line from process_item()
.
response = requests.get(get_scrapeops_url(url, location=location))
Here is our code and it's fully ready to run in production.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
model_number: str = ""
sku: str = ""
rating: float = 0.0
spoonsored: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
rating: float = 0.0
incentivized: bool = False
verified: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.bestbuy.com/site/searchpage.jsp?cp={page_number+1}&st={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="shop-sku-list-item")
for div_card in div_cards:
sponsored = False
sponsored_tag = div_card.find("div", class_="is-sponsored")
if sponsored_tag:
sponsored = True
name = div_card.find("h4", class_="sku-title").text
price_holder = div_card.select_one("div[data-testid='customer-price']")
price = price_holder.select_one("span[aria-hidden='true']").text
model_holder = div_card.find("div", class_="sku-model")
model_info_array = model_holder.find_all("span", class_="sku-value")
model_number = model_info_array[0].text
sku_number = model_info_array[1].text
rating_holder = div_card.find("div", class_="ratings-reviews")
href = rating_holder.find("a")
link = "n/a"
if href:
link = f"https://www.bestbuy.com{href.get('href')}"
rating_text = rating_holder.find("p", class_="visually-hidden").text
rating = 0.0
if rating_text != "Not Yet Reviewed":
rating = rating_text.split(" ")[1]
search_data = SearchData(
name=name,
url=link,
price=price,
model_number=model_number,
sku=sku_number,
rating=rating,
spoonsored=sponsored
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
if url == "n/a":
return
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("li", class_="review-item-simple")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review_card in review_cards:
rating_holder = review_card.find("div", class_="review-rating")
rating = float(rating_holder.find("p", class_="visually-hidden").text.split()[1])
name = review_card.find("h4").text
incentivized = False
incentivized_button = review_card.select_one("button[title='badge for Incentivized']")
if incentivized_button:
incentivized = True
verified = False
verified_button = review_card.select_one("button[title='badge for Verified Purchaser']")
if verified_button:
incentivized = True
review_data = ReviewData(
name=name,
rating=rating,
incentivized=incentivized,
verified=verified
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Now, we're going to run the same crawl we ran earlier, except this time, we're going to scrape reviews for each gpu that we find during the crawl. As before, feel free to change any of the constants to tweak your own results.
Here is our updated main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Here are our results.
The full run took 231.985 seconds. If you remember our crawl, it took 29.862 seconds.
This crawl generated a report with 72 results. 231.985 - 29.862 = 202.123 seconds spent scraping. 202.123 seconds / 72 pages = 2.807 pages per second.
This is extremely fast... twice as fast as our initial crawl (which was decent to begin with).
Legal and Ethical Considerations
Scraping the web is generally considered legal as long as you're scraping public data. Public data is any data not gated behind a login.
Private data is a completely different story. With private data, you're subject to an entirely different set of rules and regulations. If you're not sure about your scraper, consult an attorney.
Along with the legality of your scrape, you also need to pay attention to the site's own rules. Particularly, you need to pay attention to their Terms and Conditions and the robots.txt
. Violating these policies can result in suspension or even a permanent ban.
Conclusion
You now know how to scrape BestBuy. You got a crash course in iterative building and you know how to pull nested data out of an HTML page. To get a better understanding of the tech stack used in this article, take a look at the links below.
More Python Web Scraping Guides
If you enjoyed this article, check out some other ones from the Python Web Scraping Playbook. We've got learning resources for developers of all experience levels. If you'd like to learn more from our "How To Scrape" series, take a look at the links below.