Skip to main content

Scrape Walmart With Python Requests and BeautifulSoup

How to Scrape Walmart With Requests and BeautifulSoup

Founded in 1962, Walmart is the largest retailer in the world. Behind Amazon, it's the second largest online retailer globally. When we scrape Walmart, we get access to all sorts of products from all over the globe.

In today's guide, we'll be crawling Walmart search pages and then scraping product reviews.


TLDR - How to Scrape Walmart

If you need a pre-built Walmart scraper, give this one a try.

  1. Create a new project folder.
  2. Then, make a config.json file with your ScrapeOps API key.
  3. Finally, copy/paste the code below into a new Python file in the same folder.
  4. You can then run it with python name_of_your_file.py.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup

import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
sponsored: bool = False
price: float = 0.0
product_id: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
author_id: str = ""
rating: int = 0
date: str = ""
review: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed to get page {page_number}, status code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"]

for item in item_list:
if item["__typename"] != "Product":
continue

name = item.get("name")
product_id = item["usItemId"]
if not name:
continue
link = f"https://www.walmart.com/reviews/product/{product_id}"
price = item["price"]
sponsored = item["isSponsoredFlag"]
rating = item["averageRating"]

search_data = SearchData(
name=name,
stars=rating,
url=link,
sponsored=sponsored,
price=price,
product_id=product_id
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"]

for review in review_list:
name = review["userNickname"]
author_id = review["authorId"]
rating = review["rating"]
date = review["reviewSubmissionTime"]
review = review["reviewText"]

review_data = ReviewData(
name=name,
author_id=author_id,
rating=rating,
date=date,
review=review
)
review_pipeline.add_data(review_data)

review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 3
PAGES = 4
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

To change your results, feel free to change any of the following:

  • MAX_RETRIES: Defines the maximum number of times the script will retry fetching a webpage if a request fails due to issues such as network timeouts or non-200 HTTP responses.
  • MAX_THREADS: Sets the maximum number of threads that will be used concurrently while scraping.
  • PAGES: The number of search result pages to scrape for each keyword.
  • LOCATION: The location or country code where the products or reviews will be scraped from.
  • keyword_list: A list of product keywords to search for on Walmart’s website (e.g., ["laptop"]).

How To Architect Our Walmart Scraper

If you look at the screenshot below, this is the page you'll get if you don't scrape Walmart properly.

Walmart Blocked Page

To scrape Walmart, our project is going to consist of two separate scrapers, a result crawler and a review scraper.

  1. We'll run our result crawler to generate a large report of items matching our keyword. For example, if we want to search laptops, our crawler will generate a large report on laptops.
  2. Our review scraper is going to look up the individual reviews for each laptop in the report.

Here are the steps for our result crawler.

  1. Parse Walmart search data.
  2. Paginate our results for control over our data.
  3. Store our scraped data.
  4. Concurrently run steps 1 through 3 on multiple pages.
  5. Add proxy integration to bypass any anti-bots and avoid the block screen you saw earlier in this article.

The review scraper will then perform these actions.

  1. Read the CSV into an array.
  2. Parse reviews on each item from the array.
  3. Store our extracted data.
  4. Run steps 2 and 3 on multiple pages using concurrency.
  5. Integrate with the ScrapeOps Proxy to once again bypass any anti-bots.

Understanding How To Scrape Walmart

Time to get a look at Walmart from a high level. In these coming sections, we're going to look at different Walmart pages and see how they're built. Then, we'll look at how to extract our data and how to better control our results.


Step 1: How To Request Walmart Pages

Just like with any other site, we first need to perform a simple GET request.

We make a GET, and Walmart's server sends our response back as an HTML page.

The first page we'll look at is our search results. Then, we'll look at the review pages.

Our search URLs are laid out like this:

https://www.walmart.com/search?q={formatted_keyword}

Walmart Search Results Page

Our review pages are laid out like this:

https://www.walmart.com/reviews/product/{product_id}

Take a look at the page below.

Walmart product reviews


Step 2: How To Extract Data From Walmart Results and Pages

Now, lets get a better look at exactly which data we'll be extracting. Luckily for us, both the results page and the review page hold all their information nested inside a JSON blob.

You can view the search result JSON below. Walmart Search Results Page HTML Inspection

Here is the JSON blob for the reviews page. Walmart product reviews HTML Inspection


Step 3: How To Control Pagination

Like many other sites, we can control our pagination by adding a simple query param, page. If we want to view page 1, our URL would contain page=1.

Our fully paginated URL would be

https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}

We use page_number+1 because Python's range() function begins counting at 0.


Step 4: Geolocated Data

For geolocation, we'll be using the country param when talking to the ScrapeOps Proxy API. When we pass this parameter, ScrapeOps will route us through the country of our choosing.

  • If we want to appear in the US, we'd pass "country": "us".
  • If we wish to appear in the UK, we'd pass "country": "uk".

Setting Up Our Walmart Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir walmart-scraper

cd walmart-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A Walmart Search Crawler

Time to get started with our search crawler. This will be the foundation of the entire project. We'll go through and add the following features in order.

  1. Parsing
  2. Pagination
  3. Data Storage
  4. Concurrency
  5. Proxy Integration.

Step 1: Create Simple Search Data Parser

Let's get started by building a parser. Our parser will do a keyword search on Walmart. Then, it will extract data from the results.

In our code example below, we add error handling, basic structure, retry logic and our base parsing function. Everything is important, but you should pay special attention to our parsing function, scrape_search_results().

Here's the code we'll start with.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup

import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.walmart.com/search?q={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed to get page {page_number}, status code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"]

for item in item_list:
if item["__typename"] != "Product":
continue

name = item.get("name")
product_id = item["usItemId"]
if not name:
continue
link = f"https://www.walmart.com/reviews/product/{product_id}"
price = item["price"]
sponsored = item["isSponsoredFlag"]
rating = item["averageRating"]

search_data = {
"name": name,
"stars": rating,
"url": link,
"sponsored": sponsored,
"price": price,
"product_id": product_id
}
print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 3
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

All of our data comes embedded within a JSON blob, here's how we handle it.

  • soup.select_one("script[id='__NEXT_DATA__'][type='application/json']") finds the JSON.
  • We use json.loads() to convert the text into a JSON object.
  • json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"] is used to access our item list.
  • We iterate through the list and pull the following from each item:
    • "name": the name of the product.
    • "stars": the overall rating for the product.
    • "url": the link to the product's reviews.
    • "sponsored": whether or not it is a sponsored item, basically an ad.
    • "price": the price of the item.
    • "product_id": the unique number assigned to each product on the site.

Step 2: Add Pagination

We know how to parse our results, but when we do a search, we can only get the first page of results. To handle this issue, we need to add pagination. Pagination allows us to request specific pages. To add pagination, we'll add a page parameter to our URL.

Our URLs will now look like this. We use page_number+1 because our Walmart pages start at 1, but Python's range() function begins counting at 0.

"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}"

We also add a start_scrape() function. This one takes in a list of pages and runs scrape_search_results() on each page from the list.

def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)

You can see our fuller updated code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup

import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed to get page {page_number}, status code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"]

for item in item_list:
if item["__typename"] != "Product":
continue

name = item.get("name")
product_id = item["usItemId"]
if not name:
continue
link = f"https://www.walmart.com/reviews/product/{product_id}"
price = item["price"]
sponsored = item["isSponsoredFlag"]
rating = item["averageRating"]

search_data = {
"name": name,
"stars": rating,
"url": link,
"sponsored": sponsored,
"price": price,
"product_id": product_id
}
print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 3
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")
  • We added a page argument to our both our parsing function and our url.
  • start_scrape() allows us to scrape a list of pages.

Step 3: Storing the Scraped Data

After we extract our data, we need to store it. Without storage, scraping the web would be entirely pointless!

We'll store our data inside a CSV file. The CSV format is really convenient. It allows a human to open it up easily and view results as a spreadsheet. On top of that, a CSV is just a list of key-value pairs. Later, we'll write Python code to read the CSV file and then scrape reviews on the products from it.

First, we need a dataclass. We're going to call this one, SearchData, because we'll use it to represent objects from our search results.

@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
sponsored: bool = False
price: float = 0.0
product_id: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Now, we need a DataPipeline. This class opens up a pipe to a CSV file. On top of that, it does a couple other important things. If the file doesn't exist, our class creates it. If the CSV does exist, it appends it instead.

Our DataPipeline also uses the name attribute to filter out duplicate data.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

In our full, code, we now open a DataPipeline and pass it into our crawling functions. Then, instead of printing our data, we turn it into SearchData and pass it into the DataPipeline.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup

import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
sponsored: bool = False
price: float = 0.0
product_id: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed to get page {page_number}, status code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"]

for item in item_list:
if item["__typename"] != "Product":
continue

name = item.get("name")
product_id = item["usItemId"]
if not name:
continue
link = f"https://www.walmart.com/reviews/product/{product_id}"
price = item["price"]
sponsored = item["isSponsoredFlag"]
rating = item["averageRating"]

search_data = SearchData(
name=name,
stars=rating,
url=link,
sponsored=sponsored,
price=price,
product_id=product_id
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 3
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • SearchData represents a search results object.
  • DataPipeline is used to pipe dataclass (in this case, SearchData) objects to a CSV.

Step 4: Adding Concurrency

Now, to add concurrency. We'll use ThreadPoolExecutor. This gives us the power of multithreading. We'll open up a new threadpool and then parse an individual page on each available thread.

Take a look below, we've rewritten start_scrape() and removed the for loop.

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

You may view our fully updated script below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup

import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
sponsored: bool = False
price: float = 0.0
product_id: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed to get page {page_number}, status code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"]

for item in item_list:
if item["__typename"] != "Product":
continue

name = item.get("name")
product_id = item["usItemId"]
if not name:
continue
link = f"https://www.walmart.com/reviews/product/{product_id}"
price = item["price"]
sponsored = item["isSponsoredFlag"]
rating = item["averageRating"]

search_data = SearchData(
name=name,
stars=rating,
url=link,
sponsored=sponsored,
price=price,
product_id=product_id
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 3
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Pay attention to our args in executor.map():

  • scrape_search_results: our parsing function.
  • All other arguments get passed in as arrays. Then, our executor passes the args from these arrays into each call of scrape_search_results.

Step 5: Bypassing Anti-Bots

Remember the blocked screen from the top of the page? Anti-bots get us past this.

Walmart Blocked Page

In this section, we're going to write a simple function that unlocks the power of proxy. It will take a URL, an ScrapeOps Proxy API key, and several other parameters and spit out a fully proxied URL.

Here is our proxy function.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

Look closely at the payload:

  • "api_key": our ScrapeOps API key.
  • "url": the URL we want to scrape.
  • "country": is the location we wish to appear in.

Here is our crawler now that it's production ready.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup

import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
sponsored: bool = False
price: float = 0.0
product_id: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed to get page {page_number}, status code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"]

for item in item_list:
if item["__typename"] != "Product":
continue

name = item.get("name")
product_id = item["usItemId"]
if not name:
continue
link = f"https://www.walmart.com/reviews/product/{product_id}"
price = item["price"]
sponsored = item["isSponsoredFlag"]
rating = item["averageRating"]

search_data = SearchData(
name=name,
stars=rating,
url=link,
sponsored=sponsored,
price=price,
product_id=product_id
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 3
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Now, let's run this thing in production! Here is the main we'll be working with. we're crawling 4 pages on 3 threads.

Feel free to change any of these to tweak your results:

  • MAX_RETRIES: Defines the maximum number of times the script will retry fetching a webpage if a request fails due to issues such as network timeouts or non-200 HTTP responses.
  • MAX_THREADS: Sets the maximum number of threads that will be used concurrently while scraping.
  • PAGES: The number of search result pages to scrape for each keyword.
  • LOCATION: The location or country code where the products or reviews will be scraped from.
  • keyword_list: A list of product keywords to search for on Walmart’s website (e.g., ["laptop"]).
if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 3
PAGES = 4
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Here are the results.

Crawler Performance Terminal

We scraped 4 pages in 10.092 seconds. 10.092 seconds / 4 pages = 2.523 seconds per page. Our crawl generated a report with 164 different laptop items.


Build A Walmart Scraper

When we build our scraper, we're going to follow a similar plan to what we did with the crawler. Our scraper will perform the following steps in order.

  • Read the CSV file into an array.
  • Parse the individual items from our array.
  • Store the scraped data from the parse.
  • Utilize concurrency to handle scraping multiple webpages at once.
  • Use the ScrapeOps Proxy API to once again get past anti-bots.

Step 1: Create Simple Business Data Parser

Like we did earlier, we're going to start with our basic parsing function. We'll add basic structure including error handling, retry logic. Once again, pay close attention to our parsing in the function. This is where the magic happens.

def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"]

for review in review_list:
name = review["userNickname"]
author_id = review["authorId"]
rating = review["rating"]
date = review["reviewSubmissionTime"]
review = review["reviewText"]

review_data = {
"name": name,
"author_id": author_id,
"rating": rating,
"date": date,
"review": review
}
print(review_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
  • We find our JSON exactly the same way we did earlier: soup.select_one("script[id='__NEXT_DATA__'][type='application/json']").
  • json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"] finds our list of customer reviews.
  • We iterate through the reviews and pull the following data:
    • name: the name of the reviewer.
    • author_id: a unique identifier for the reviewer, much like our product_id from earlier.
    • rating: the rating left by the reviewer.
    • date: the date that the review was left.
    • review: the actual text of the review, for instance "It was good. I really like [x] about this laptop."

Step 2: Loading URLs To Scrape

To scrape these reviews, we need to feed urls into our parsing function. In order to accomplish this, we need to read the report generated by the crawler.

Here, we're going to write another function, similar to start_scrape() from earlier.

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_item(row, location, retries=retries)

After we've read the CSV, we call process_item() on each row from the file using a for loop. We'll remove the for loop a little later on when we add concurrency later on. You can see our fully updated code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup

import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
sponsored: bool = False
price: float = 0.0
product_id: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed to get page {page_number}, status code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"]

for item in item_list:
if item["__typename"] != "Product":
continue

name = item.get("name")
product_id = item["usItemId"]
if not name:
continue
link = f"https://www.walmart.com/reviews/product/{product_id}"
price = item["price"]
sponsored = item["isSponsoredFlag"]
rating = item["averageRating"]

search_data = SearchData(
name=name,
stars=rating,
url=link,
sponsored=sponsored,
price=price,
product_id=product_id
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"]

for review in review_list:
name = review["userNickname"]
author_id = review["authorId"]
rating = review["rating"]
date = review["reviewSubmissionTime"]
review = review["reviewText"]

review_data = {
"name": name,
"author_id": author_id,
"rating": rating,
"date": date,
"review": review
}
print(review_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_item(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 3
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
  • process_results() reads our CSV into an array and iterates through it.
  • As we iterate, we call process_item() on each item from the array to scrape its reviews.

Step 3: Storing the Scraped Data

As mentioned earlier, scraping without storing is a waste of time. We already have our DataPipeline that can take in a dataclass, but the only class we have is SearchData.

To address this, we're going to add another one called ReviewData. Then, from within our parsing function, we'll create a new DataPipeline and pass ReviewData objects into it while we parse.

Take a look at ReviewData.

@dataclass
class ReviewData:
name: str = ""
author_id: str = ""
rating: int = 0
date: str = ""
review: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

ReviewData holds all of the data we extracted during the parse. You can see how everything works in the full code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup

import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
sponsored: bool = False
price: float = 0.0
product_id: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
author_id: str = ""
rating: int = 0
date: str = ""
review: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed to get page {page_number}, status code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"]

for item in item_list:
if item["__typename"] != "Product":
continue

name = item.get("name")
product_id = item["usItemId"]
if not name:
continue
link = f"https://www.walmart.com/reviews/product/{product_id}"
price = item["price"]
sponsored = item["isSponsoredFlag"]
rating = item["averageRating"]

search_data = SearchData(
name=name,
stars=rating,
url=link,
sponsored=sponsored,
price=price,
product_id=product_id
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
review_list = json_data["props"]["pageProps"]["initialData"]["data"]["reviews"]["customerReviews"]

for review in review_list:
name = review["userNickname"]
author_id = review["authorId"]
rating = review["rating"]
date = review["reviewSubmissionTime"]
review = review["reviewText"]

review_data = ReviewData(
name=name,
author_id=author_id,
rating=rating,
date=date,
review=review
)
review_pipeline.add_data(review_data)

review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_item(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 3
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["laptop"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

From within our parsing function, we now:

  • Open a DataPipeline.
  • Pass ReviewData objects into the pipeline as we parse them.
  • Close the pipeline once we're finished parsing.

Step 4: Adding Concurrency

We'll add concurrency almost exactly the same way we did before. ThreadPoolExecutor will open up a new pool of threads. We'll pass our parsing function in as the first argument, and then everything else gets passed in as an array to then get passed into our parser.

Here is the finished process_results().

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Our CSV reading logic stays the same, but the parse is now running on multiple threads concurrently.


Step 5: Bypassing Anti-Bots

We'll bypass anti-bots exactly the same way we did earlier. We'll call get_scrapeops_url() from within our parser and now we have a custom proxy for each request we make during our review scrape.

scrapeops_proxy_url = get_scrapeops_url(url, location=location)

Here is our code now that it's ready for production.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup

import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
sponsored: bool = False
price: float = 0.0
product_id: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
author_id: str = ""
rating: int = 0
date: str = ""
review: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.walmart.com/search?q={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed to get page {page_number}, status code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__NEXT_DATA__'][type='application/json']")
json_data = json.loads(script_tag.text)
item_list = json_data["props"]["pageProps"]["initialData"]["searchResult"]["itemStacks"][0]["items"]

for item in item_list:
if item["__typename"] != "Product":
continue

name = item.get("name")
product_id = item["usItemId"]
if not name:
continue
link = f"https://www.walmart.com/reviews/product/{product_id}"
price = item["price"]
sponsored = item["isSponsoredFlag"]
rating = item["averageRating"]

search_data = SearchData(
name=name,
stars=rating,
url=link,
sponsored=sponsored,
price=price,
product_id=product_id
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup