How to Scrape Amazon With Python Requests
Amazon is the largest online retailer in the world and one of the largest overall retailers in the world.
Whether you seek to track product prices, analyze customer reviews, or monitor competitors extracting information from Amazon can provide valuable insights and opportunities.
In this guide, we'll take you through how to scrape Amazon using Python Requests and BeautifulSoup.
- TLDR: How to Scrape Amazon
- How To Architect Our Scraper
- Understanding How To Scrape Amazon
- Setting Up Our Amazon Scraper
- Build a Search Results Crawler
- Build a Product Parser
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
If you prefer to follow along with a video then check out the video tutorial version here:
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Amazon
If you are looking for a production-ready Amazon scraper, follow the script below:
import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ProductPageData:
name: str = ""
title: str = ""
url: str = "",
pricing_unit: str = "",
price: float = None,
feature_1: str = "",
feature_2: str = "",
feature_3: str = "",
feature_4: str = "",
images_1: str = "",
images_2: str = "",
images_3: str = "",
images_4: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location)
resp = requests.get(url)
if resp.status_code == 200:
logger.info("Successfully fetched page")
soup = BeautifulSoup(resp.text, "html.parser")
bad_divs = soup.find_all("div", class_="AdHolder")
for bad_div in bad_divs:
bad_div.decompose()
divs = soup.find_all("div")
last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")
rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
rating = float(rating_present)
price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0
real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price
if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
else:
raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
print(f"Exited scrape_products for :{product_name}")
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
def parse_product(product_object, location="us", retries=3):
url = product_object["url"]
tries = 0
success = False
product_url = f"https://www.amazon.com/{url}"
url_array = product_url.split("/")
title = url_array[-4]
product_pipeline = DataPipeline(csv_filename=f"{title}.csv")
asin = url_array[-2]
while tries <= retries and not success:
try:
resp = requests.get(get_scrapeops_url(product_url, location=location))
if resp.status_code == 200:
soup = BeautifulSoup(resp.text, "html.parser")
#find all the images
spans = soup.find_all("span")
images_to_save = []
for span in spans:
image_array = span.find_all("span")
for item in image_array:
image_span = item.find("span")
if image_span is not None:
images = image_span.find_all("img")
for image in images:
image_link = image.get("src")
if "https://m.media-amazon.com/images/" in image_link not in images_to_save:
images_to_save.append(image_link)
features = []
feature_bullets = soup.find_all("li", class_="a-spacing-mini")
for feature in feature_bullets:
text = feature.find("span").text
if text not in features:
features.append(text)
price_symbol = soup.find("span", class_="a-price-symbol").text
whole_number = soup.find("span", class_="a-price-whole").text.replace(",", "").replace(".", "")
decimal = soup.find("span", class_="a-price-fraction").text
price = float(f"{whole_number}.{decimal}")
item_data = ProductPageData(
name=asin,
title=title,
url=product_url,
pricing_unit=price_symbol,
price=price,
feature_1=features[0] if len(features) > 0 else "n/a",
feature_2=features[1] if len(features) > 1 else "n/a",
feature_3=features[2] if len(features) > 2 else "n/a",
feature_4=features[3] if len(features) > 3 else "n/a",
images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a",
images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a",
images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a",
images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a"
)
product_pipeline.add_data(item_data)
product_pipeline.close_pipeline()
success = True
else:
raise Exception(f"Failed response from server, status code: {resp.status_code}")
except Exception as e:
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
return None
def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))
print(len(reader))
with ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader))
if __name__ == "__main__":
PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 20
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)
for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
The code above gives you a production ready Amazon scraper fully interated with the ScrapeOps Proxy API. To change your results, simply change your constants.
If you want detailed results on only one page of a search, change PAGES
to 1. If you wish to run with 10 threads, change MAX_THREADS
to 10... use caution with this one, each thread opens up another page in the proxy and ScrapeOps proxy does have a concurrency limit.
How To How To Architect Our Amazon Scraper
When we scrape Amazon, we need to scrape both search results and individual page data. When we search on Amazon, we get a bunch of pages and each page has a bunch of results.
Each item in our search also has its own page containing specific details about the item. You can get a better feel for these things if you take a look at the images below.
Results Page
The Results page holds most of the information we want to scrape such as the product name, sale price, real price, and rating.
Product Page The Product page holds much of the information we already find in the Result page and more. In our case specifically, the Product page holds bullet points describing the item and images of said item.
We find a phone we're interested in using the results. We learn more about that phone from our product page.
Understanding How To Scrape Amazon
Before plunging head first into code, we're going to talk about how our scraper works on a high level. In this section, we're going over the required steps in greater detail. If you've got some experience in web scraping already, feel free to skip this section.
Step 1: How To Request Amazon Pages
Let's take a better look at the URL from the page we looked at earlier.
We are interested in this part of the URL:
https://www.amazon.com/s?k=phone
https://www.amazon.com/
is our base url.s?
shows that we're performing a search query.k=phone
tells the Amazon server that we want to look at phones.
Step 2: How To Extract Data From Amazon Pages
While some sites store their data conveniently in a JSON blob, Amazon does not. Amazon nests their data deeply within divs and spans. To extract our data, we need to pull it from these elements nested within the HTML.
Let's first take a look at the Results page. Below, you can see an item title with the inspect window open. If you look closely, you'll see the title text is nested within a span
element.
Now, let's take a look at the product page. Look closely here as well. Our feature bullets are actually span
elements nested within li
(list) elements.
--
Step 3: How To Control Pagination
Controlling our pagination is very easy. It just requires an additional parameter to our URL. When pagination is added in, our URL will look like this:
https://www.amazon.com/s?k={product_name}&page={page_number}
So if we want to search page 1
of phones, this would be our URL:
https://www.amazon.com/s?k=phone&page=1
Step 4: Geolocated Data
Amazon does serve different content based on our location. If we're in the US, prices will be denoted in dollars, $
. If we're in the UK, Amazon will give us our prices in the pound, GBP
.
To control our location effectively, we'll be using the ScrapeOps Proxy API. The ScrapeOps API will route our traffic through servers in whichever country we ask for.
If we want to be in the UK, ScrapeOps will put us in the UK. If we want to be from the US, ScrapeOps will route us through servers in the US.
Setting Up Our Amazon Scraper Project
Now that we know what we want to do, let's get started on building our scraper. First we'll make a new project folder, and then we'll initialize a virtual environment and install dependencies.
Create a New Folder
mkdir amazon-scraper
From inside your new folder, create a new virtual environment.
Create a New Virtual Environment
python -m venv venv
Activate the Virtual Environment
source venv/bin/activate
Install Dependencies
pip install requests
pip install beautifulsoup4
Build A Amazon Search Crawler
The first portion of our project will be spent building a crawler to scrape Amazon search results. This crawler will actually be grabbing the bulk of our data. This crawler needs to:
- parse results
- manage result batches using pagination
- store results from those pages
- search multiple pages concurrently
- integrate with a proxy for both location support and anti-bot resistance
Step 1: Create Simple Search Data Parser
Let's get started by creating a crawler that simply parses a Results page. Here is a scraper with a simple parsing function.
- The parsing function below first finds all the
div
elements on the page. - Then it checks if each
div
isparsable
. - If the
div
is parsable, we find theh2
element and strip out the whitespace and newlines. - If our stripped text doesn't hold a
None
value, we move on and extract the following from each listing:asin
title
url
is_ad
pricing_unit
price
real_price
rating
import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
def search_products(product_name: str, retries=3):
tries = 0
success = False
while tries < retries and not success:
try:
url = f"https://www.amazon.com/s?k={product_name}"
resp = requests.get(url)
if resp.status_code == 200:
logger.info("Successfully fetched page")
soup = BeautifulSoup(resp.text, "html.parser")
divs = soup.find_all("div")
last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() != last_title and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")
rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
rating = float(rating_present)
price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0
real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price
if symbol_presence and rating_present and price_present:
product = {
"name": asin,
"title": title,
"url": product_url,
"is_ad": ad_status,
"pricing_unit": pricing_unit,
"price": price,
"real_price": real_price,
"rating": rating
}
print(product)
last_title = title
else:
continue
success = True
else:
raise Exception(f"Failed to scrape the page, Status Code {resp.status_code}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
print(f"Exited scrape_products for :{product_name}")
if __name__ == "__main__":
PRODUCTS = ["phone"]
MAX_RETRIES = 2
for product in PRODUCTS:
search_products(product, retries=MAX_RETRIES)
If you run this example, you'll get the following error.
Amazon will continue to block us because we appear abnormal. We'll address this later on in our scraper when we add proxy support.
Step 2: Add Pagination
Now that we can parse a page, let's add pagination into our parsing function. Pagination gives us the ability to control our result batches. If we want page 1, fetch page 1. If we want page 2, fetch page 2... and so on and so forth.
The code example below is almost exactly the same as before. The major difference: we have a page_number
added to both our function arguments and our URL.
import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
def search_products(product_name: str, page_number=1, retries=3):
tries = 0
success = False
while tries < retries and not success:
try:
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
resp = requests.get(url)
if resp.status_code == 200:
logger.info("Successfully fetched page")
soup = BeautifulSoup(resp.text, "html.parser")
divs = soup.find_all("div")
last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() != last_title and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")
rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
rating = float(rating_present)
price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0
real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price
if symbol_presence and rating_present and price_present:
product = {
"name": asin,
"title": title,
"url": product_url,
"is_ad": ad_status,
"pricing_unit": pricing_unit,
"price": price,
"real_price": real_price,
"rating": rating
}
print(product)
last_title = title
else:
continue
success = True
else:
raise Exception(f"Failed to scrape the page, Status Code {resp.status_code}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
print(f"Exited scrape_products for :{product_name}")
if __name__ == "__main__":
PRODUCTS = ["phone"]
MAX_RETRIES = 2
for product in PRODUCTS:
search_products(product, retries=MAX_RETRIES)
As you can see above, not much has changed at all in our code. Our function now takes a page_number
and inserts it into our url.
Step 3: Storing the Scraped Data
Now that our crawler can choose a page to scrape, it's time to give it the ability to store our data. In this section, we'll add a couple classes to do just that: ProductData
and DataPipeline
.
Here is our updated code example.
import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def search_products(product_name: str, page_number=1, retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
print(url)
resp = requests.get(url)
if resp.status_code == 200:
logger.info("Successfully fetched page")
soup = BeautifulSoup(resp.text, "html.parser")
bad_divs = soup.find_all("div", class_="AdHolder")
for bad_div in bad_divs:
bad_div.decompose()
divs = soup.find_all("div")
last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")
rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
print(rating_present)
print(title)
rating = float(rating_present)
price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0
real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price
if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
else:
raise Exception(f"Failed to scrape the page, Status Code {resp.status_code}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
print(f"Exited scrape_products for :{product_name}")
if __name__ == "__main__":
PRODUCTS = ["phone"]
MAX_RETRIES = 2
for product in PRODUCTS:
product_pipeline = DataPipeline(csv_filename=f"{product}.csv")
search_products(product, retries=MAX_RETRIES, data_pipeline=product_pipeline)
product_pipeline.close_pipeline()
In the example above, we add our ProductData
class to hold individual product data. We add a DataPipeline
as well.
Our DataPipeline
does all the heavy lifting of removing duplicates and saving our information to a CSV file.
Step 4: Adding Concurrency
When we added pagination earlier, we gave our crawler the ability to scrape different pages. Now that we can scrape a specific page and store its data, it's time to give our crawler the power to scrape a bunch of pages at once. With concurrency, we can do exactly that.
Here is our threaded_search()
function.
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
We use ThreadPoolExecutor
to manage our threads. This function will use 5 threads to perform our searches by default, so we'll have a maximum of 5 searches going simultaneously.
Here is our updated code. We also added a location
argument to search_products()
. While we don't use the location in this example, we'll be using it in the next section when we add proxy support.
import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
print(url)
resp = requests.get(url)
if resp.status_code == 200:
logger.info("Successfully fetched page")
soup = BeautifulSoup(resp.text, "html.parser")
bad_divs = soup.find_all("div", class_="AdHolder")
for bad_div in bad_divs:
bad_div.decompose()
divs = soup.find_all("div")
last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")
rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
print(rating_present)
print(title)
rating = float(rating_present)
price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0
real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price
if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
else:
raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
print(f"Exited scrape_products for :{product_name}")
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
if __name__ == "__main__":
PRODUCTS = ["phone"]
MAX_RETRIES = 2
PAGES = 5
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
If you run the code above, you'll still get blocked. To an Amazon server, our scraper already looked a bit abnormal. Now it's not only abnormal, it's exponentially faster than it was before. Let's add proxy support in the next section.
Step 5: Bypassing Anti-Bots
We're almost ready for our production run. It's time to add proxy support so Amazon stops blocking our crawler. We really only need to add one function here, get_scrapeops_url()
.
This function takes in a regular URL and uses basic string formatting to convert it into a URL that uses the ScrapeOps API. Take a look below:
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
This function takes our URL and formats it into a proxied URL. Here is our updated code below.
import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location)
print(url)
resp = requests.get(url)
if resp.status_code == 200:
logger.info("Successfully fetched page")
soup = BeautifulSoup(resp.text, "html.parser")
bad_divs = soup.find_all("div", class_="AdHolder")
for bad_div in bad_divs:
bad_div.decompose()
divs = soup.find_all("div")
last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")
rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
print(rating_present)
print(title)
rating = float(rating_present)
price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0
real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price
if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
else:
raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
print(f"Exited scrape_products for :{product_name}")
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
if __name__ == "__main__":
PRODUCTS = ["phone"]
MAX_RETRIES = 2
PAGES = 5
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
Now that we can get past anti-bots, we're ready to move on to our production run.
Step 6: Production Run
Time for our production run. Take a look at our main function below.
if __name__ == "__main__":
PRODUCTS = ["phone"]
MAX_RETRIES = 2
PAGES = 10
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
You can change any of the following constants to change your results:
PRODUCTS
MAX_RETRIES
PAGES
MAX_THREADS
LOCATION
To run this scraper, replace the filename below with whatever you chose to name yours.
python crawler-proxy.py
Our final scraper generated a report on 10 pages full of phones in 26 seconds.
Here is the report it created:
Build An Amazon Product Scraper
Now it's time to build a scraper that looks up individual products. From these individual product pages, we need to extract feature bullets, prices, and images. This way, if you're interested in a product, simply pull up your report for that product!
Step 1: Create Simple Amazon Product Page Data Parser
Here's a parsing function that retrieves data from a product page. We're not ready to add it into our scraper because we need the ability to read the CSV we created earlier.
def parse_product(product_object, location="us", retries=3):
url = product_object["url"]
tries = 0
success = False
product_url = f"https://www.amazon.com/{url}"
url_array = product_url.split("/")
title = url_array[-4]
asin = url_array[-2]
print("asin", asin, title)
while tries <= retries and not success:
try:
resp = requests.get(url)
if resp.status_code == 200:
print("Content Fetched")
soup = BeautifulSoup(resp.text, "html.parser")
#find all the images
spans = soup.find_all("span")
images_to_save = []
for span in spans:
image_array = span.find_all("span")
for item in image_array:
image_span = item.find("span")
if image_span is not None:
images = image_span.find_all("img")
for image in images:
image_link = image.get("src")
if "https://m.media-amazon.com/images/" in image_link not in images_to_save:
images_to_save.append(image_link)
features = []
feature_bullets = soup.find_all("li", class_="a-spacing-mini")
for feature in feature_bullets:
text = feature.find("span").text
if text not in features:
features.append(text)
price_symbol = soup.find("span", class_="a-price-symbol").text
whole_number = soup.find("span", class_="a-price-whole").text.replace(",", ".")
decimal = soup.find("span", class_="a-price-fraction").text
price = float(f"{whole_number}{decimal}")
item_data = {
"name": asin,
"title": title,
"url": product_url,
"pricing_unit": price_symbol,
"price": price,
"feature_1": features[0] if len(features) > 0 else "n/a",
"feature_2": features[1] if len(features) > 1 else "n/a",
"feature_3": features[2] if len(features) > 2 else "n/a",
"feature_4": features[3] if len(features) > 3 else "n/a",
"images_1": images_to_save[0] if len(images_to_save) > 0 else "n/a",
"images_2": images_to_save[1] if len(images_to_save) > 1 else "n/a",
"images_3": images_to_save[2] if len(images_to_save) > 2 else "n/a",
"images_4": images_to_save[3] if len(images_to_save) > 3 else "n/a"
}
print("Product Page Data:", item_data)
success = True
else:
raise Exception(f"Failed response from server, status code: {resp.status_code}")
except Exception as e:
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
In the above function, we pull the features and item images from the product page. These will be used in the individual report we generate for each product.
Step 2: Loading URLs To Scrape
Now it's time to give our code the ability to run. In order to parse these items, we need to read them from a CSV file and then pass them into our parse function. The code example below adds a threaded_item_lookup()
function.
At the moment, this function does not use threading. We just have a for
loop as a placeholder. This function reads the CSV file and then passes each object from the file into parse_product()
.
import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location)
resp = requests.get(url)
if resp.status_code == 200:
logger.info("Successfully fetched page")
soup = BeautifulSoup(resp.text, "html.parser")
bad_divs = soup.find_all("div", class_="AdHolder")
for bad_div in bad_divs:
bad_div.decompose()
divs = soup.find_all("div")
last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")
rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
print(rating_present)
print(title)
rating = float(rating_present)
price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0
real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price
if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
else:
raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
print(f"Exited scrape_products for :{product_name}")
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
def parse_product(product_object, location="us", retries=3):
url = product_object["url"]
tries = 0
success = False
product_url = f"https://www.amazon.com/{url}"
url_array = product_url.split("/")
title = url_array[-4]
asin = url_array[-2]
print("asin", asin, title)
while tries <= retries and not success:
try:
resp = requests.get(product_url)
if resp.status_code == 200:
print("Content Fetched")
soup = BeautifulSoup(resp.text, "html.parser")
#find all the images
spans = soup.find_all("span")
images_to_save = []
for span in spans:
image_array = span.find_all("span")
for item in image_array:
image_span = item.find("span")
if image_span is not None:
images = image_span.find_all("img")
for image in images:
image_link = image.get("src")
if "https://m.media-amazon.com/images/" in image_link not in images_to_save:
images_to_save.append(image_link)
features = []
feature_bullets = soup.find_all("li", class_="a-spacing-mini")
for feature in feature_bullets:
text = feature.find("span").text
if text not in features:
features.append(text)
price_symbol = soup.find("span", class_="a-price-symbol").text
whole_number = soup.find("span", class_="a-price-whole").text.replace(",", ".")
decimal = soup.find("span", class_="a-price-fraction").text
price = float(f"{whole_number}{decimal}")
item_data = {
"name": asin,
"title": title,
"url": product_url,
"pricing_unit": price_symbol,
"price": price,
"feature_1": features[0] if len(features) > 0 else "n/a",
"feature_2": features[1] if len(features) > 1 else "n/a",
"feature_3": features[2] if len(features) > 2 else "n/a",
"feature_4": features[3] if len(features) > 3 else "n/a",
"images_1": images_to_save[0] if len(images_to_save) > 0 else "n/a",
"images_2": images_to_save[1] if len(images_to_save) > 1 else "n/a",
"images_3": images_to_save[2] if len(images_to_save) > 2 else "n/a",
"images_4": images_to_save[3] if len(images_to_save) > 3 else "n/a"
}
print("Product Page Data:", item_data)
success = True
else:
raise Exception(f"Failed response from server, status code: {resp.status_code}")
except Exception as e:
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))
for product_object in reader:
parse_product(product_object, location=location, retries=retries)
if __name__ == "__main__":
PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 1
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)
for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS)
You might be wondering why we use a separate field for each bullet point and image. The reason for this is actually pretty simple: our csv module has a very difficult time managing arrays and once we convert this item into a @dataclass
, our fields won't be able to hold mutable size. In Python, arrays are mutable by default.
Step 3: Storing the Scraped Data
Similar to how we stored our data with the results crawler, we're going to be using a custom class to hold our data. We'll then pass this object into our DataPipeline
to both filter and store our data. The code below adds a ProductPageData
class and passes it into the pipeline.
import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ProductPageData:
name: str = ""
title: str = ""
url: str = "",
pricing_unit: str = "",
price: float = None,
feature_1: str = "",
feature_2: str = "",
feature_3: str = "",
feature_4: str = "",
images_1: str = "",
images_2: str = "",
images_3: str = "",
images_4: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location)
resp = requests.get(url)
if resp.status_code == 200:
logger.info("Successfully fetched page")
soup = BeautifulSoup(resp.text, "html.parser")
bad_divs = soup.find_all("div", class_="AdHolder")
for bad_div in bad_divs:
bad_div.decompose()
divs = soup.find_all("div")
last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")
rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
print(rating_present)
print(title)
rating = float(rating_present)
price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0
real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price
if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
else:
raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
print(f"Exited scrape_products for :{product_name}")
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
def parse_product(product_object, location="us", retries=3):
url = product_object["url"]
tries = 0
success = False
product_url = f"https://www.amazon.com/{url}"
url_array = product_url.split("/")
title = url_array[-4]
product_pipeline = DataPipeline(csv_filename=f"{title}.csv")
asin = url_array[-2]
while tries <= retries and not success:
try:
resp = requests.get(product_url)
if resp.status_code == 200:
soup = BeautifulSoup(resp.text, "html.parser")
#find all the images
spans = soup.find_all("span")
images_to_save = []
for span in spans:
image_array = span.find_all("span")
for item in image_array:
image_span = item.find("span")
if image_span is not None:
images = image_span.find_all("img")
for image in images:
image_link = image.get("src")
if "https://m.media-amazon.com/images/" in image_link not in images_to_save:
images_to_save.append(image_link)
features = []
feature_bullets = soup.find_all("li", class_="a-spacing-mini")
for feature in feature_bullets:
text = feature.find("span").text
if text not in features:
features.append(text)
price_symbol = soup.find("span", class_="a-price-symbol").text
whole_number = soup.find("span", class_="a-price-whole").text.replace(",", ".")
decimal = soup.find("span", class_="a-price-fraction").text
price = float(f"{whole_number}{decimal}")
item_data = ProductPageData(
name=asin,
title=title,
url=product_url,
pricing_unit=price_symbol,
price=price,
feature_1=features[0] if len(features) > 0 else "n/a",
feature_2=features[1] if len(features) > 1 else "n/a",
feature_3=features[2] if len(features) > 2 else "n/a",
feature_4=features[3] if len(features) > 3 else "n/a",
images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a",
images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a",
images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a",
images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a"
)
product_pipeline.add_data(item_data)
product_pipeline.close_pipeline()
success = True
else:
raise Exception(f"Failed response from server, status code: {resp.status_code}")
except Exception as e:
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))
for product_object in reader:
parse_product(product_object, location=location, retries=retries)
if __name__ == "__main__":
PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 1
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)
for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
You may have also notice that from inside parse_product()
, we open up an individual pipeline for each product. This way, we generate an individual report for each one of the products we scraped earlier with the crawler.
Step 4: Adding Concurrency
Now, we're going to add concurrency so we can parse multiple products at once. This is very similar to when we added concurrency to the crawler earlier.
Take a look at the function below, it's the finished version of threaded_item_lookup()
:
def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))
with ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader))
Aside from the small changes in this function, everything else remains the same. In the next example, we'll add proxy support.
Step 5: Bypassing Anti-Bots
As you learned earlier in this article, Amazon will definitely block you if your scraper seems like it could be suspicious. Our scraper already looked abnormal, after adding concurrency, it looks really abnormal.
In this example, we're going to change one line of code and make the entire thing work.
resp = requests.get(get_scrapeops_url(product_url, location=location))
In parse_product()
we simply convert our url in to a proxied one.
Here is the full code:
import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ProductPageData:
name: str = ""
title: str = ""
url: str = "",
pricing_unit: str = "",
price: float = None,
feature_1: str = "",
feature_2: str = "",
feature_3: str = "",
feature_4: str = "",
images_1: str = "",
images_2: str = "",
images_3: str = "",
images_4: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location)
resp = requests.get(url)
if resp.status_code == 200:
logger.info("Successfully fetched page")
soup = BeautifulSoup(resp.text, "html.parser")
bad_divs = soup.find_all("div", class_="AdHolder")
for bad_div in bad_divs:
bad_div.decompose()
divs = soup.find_all("div")
last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")
rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
print(rating_present)
print(title)
rating = float(rating_present)
price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0
real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price
if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
else:
raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
print(f"Exited scrape_products for :{product_name}")
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
def parse_product(product_object, location="us", retries=3):
url = product_object["url"]
tries = 0
success = False
product_url = f"https://www.amazon.com/{url}"
url_array = product_url.split("/")
title = url_array[-4]
product_pipeline = DataPipeline(csv_filename=f"{title}.csv")
asin = url_array[-2]
while tries <= retries and not success:
try:
resp = requests.get(get_scrapeops_url(product_url, location=location))
if resp.status_code == 200:
soup = BeautifulSoup(resp.text, "html.parser")
#find all the images
spans = soup.find_all("span")
images_to_save = []
for span in spans:
image_array = span.find_all("span")
for item in image_array:
image_span = item.find("span")
if image_span is not None:
images = image_span.find_all("img")
for image in images:
image_link = image.get("src")
if "https://m.media-amazon.com/images/" in image_link not in images_to_save:
images_to_save.append(image_link)
features = []
feature_bullets = soup.find_all("li", class_="a-spacing-mini")
for feature in feature_bullets:
text = feature.find("span").text
if text not in features:
features.append(text)
price_symbol = soup.find("span", class_="a-price-symbol").text
whole_number = soup.find("span", class_="a-price-whole").text.replace(",", "").replace(".", "")
decimal = soup.find("span", class_="a-price-fraction").text
price = float(f"{whole_number}.{decimal}")
item_data = ProductPageData(
name=asin,
title=title,
url=product_url,
pricing_unit=price_symbol,
price=price,
feature_1=features[0] if len(features) > 0 else "n/a",
feature_2=features[1] if len(features) > 1 else "n/a",
feature_3=features[2] if len(features) > 2 else "n/a",
feature_4=features[3] if len(features) > 3 else "n/a",
images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a",
images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a",
images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a",
images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a"
)
product_pipeline.add_data(item_data)
product_pipeline.close_pipeline()
success = True
else:
raise Exception(f"Failed response from server, status code: {resp.status_code}")
except Exception as e:
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
return None
def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))
print(len(reader))
with ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader))
if __name__ == "__main__":
PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 1
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)
for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Time for the production run. I'm going to change one constant, PAGES
to 20 and time the operation from start to finish. If you'd like to change your results, feel free to change any of the constants from the main.
if __name__ == "__main__":
PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 20
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)
for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
If you look at the image below, our full run took 12 minutes and 29 seconds. This was to parse 287 items individual. That's approximately 23 items per minute!
Legal and Ethical Considerations
When scraping anything on the web, it's always important to respect the robots.txt and Terms of Service on the site you're scraping. For Amazon you can check the links below.
It's important to know that if you violate a site's terms, they can suspend or even delete your account. When scraping, don't reveal private data.
Public data (data that is not gated behind a login) is considered public, so feel free to scrape it. When scraping private data, always check the site's terms before creating your scraper to avoid legal troubles.
If you are concerned about the legality of your scraping project, it is best to consult an attorney.
Conclusion
You now know how to build both a Results crawler and a Product scraper. You should have a decent understanding of how to use both Requests and BeautifulSoup and you now have excellent command of performance because of your ability to use concurrency. Take all these new skills and go build something!
Check the links below to know more about the tech stack used in this article.
More Python Web Scraping Guides
Wanna level up your scraping skills? Here at ScrapeOps, we have a ton of learning resources. Check our Python Web Scraping Playbook or take a look at some of the articles below: