Skip to main content

Scrape Amazon With Python Requests and BeautifulSoup

How to Scrape Amazon With Python Requests

Amazon is the largest online retailer in the world and one of the largest overall retailers in the world.

Whether you seek to track product prices, analyze customer reviews, or monitor competitors extracting information from Amazon can provide valuable insights and opportunities.

In this guide, we'll take you through how to scrape Amazon using Python Requests and BeautifulSoup.

If you prefer to follow along with a video then check out the video tutorial version here:

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape Amazon

If you are looking for a production-ready Amazon scraper, follow the script below:

import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ProductPageData:
name: str = ""
title: str = ""
url: str = "",
pricing_unit: str = "",
price: float = None,
feature_1: str = "",
feature_2: str = "",
feature_3: str = "",
feature_4: str = "",
images_1: str = "",
images_2: str = "",
images_3: str = "",
images_4: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False

while tries < retries and not success:
try:
url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location)
resp = requests.get(url)

if resp.status_code == 200:
logger.info("Successfully fetched page")

soup = BeautifulSoup(resp.text, "html.parser")


bad_divs = soup.find_all("div", class_="AdHolder")


for bad_div in bad_divs:
bad_div.decompose()

divs = soup.find_all("div")

last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")

rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"

rating = float(rating_present)

price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0

real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price


if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)

data_pipeline.add_data(product)


last_title = title
else:
continue
success = True

else:

raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}")

except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")


print(f"Exited scrape_products for :{product_name}")

def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")

pages = list(range(1, pages+1))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)

search_pipeline.close_pipeline()


def parse_product(product_object, location="us", retries=3):
url = product_object["url"]
tries = 0
success = False

product_url = f"https://www.amazon.com/{url}"

url_array = product_url.split("/")

title = url_array[-4]

product_pipeline = DataPipeline(csv_filename=f"{title}.csv")

asin = url_array[-2]


while tries <= retries and not success:
try:
resp = requests.get(get_scrapeops_url(product_url, location=location))
if resp.status_code == 200:
soup = BeautifulSoup(resp.text, "html.parser")

#find all the images
spans = soup.find_all("span")

images_to_save = []

for span in spans:
image_array = span.find_all("span")

for item in image_array:
image_span = item.find("span")
if image_span is not None:
images = image_span.find_all("img")
for image in images:
image_link = image.get("src")
if "https://m.media-amazon.com/images/" in image_link not in images_to_save:
images_to_save.append(image_link)
features = []
feature_bullets = soup.find_all("li", class_="a-spacing-mini")
for feature in feature_bullets:
text = feature.find("span").text
if text not in features:
features.append(text)
price_symbol = soup.find("span", class_="a-price-symbol").text
whole_number = soup.find("span", class_="a-price-whole").text.replace(",", "").replace(".", "")
decimal = soup.find("span", class_="a-price-fraction").text

price = float(f"{whole_number}.{decimal}")

item_data = ProductPageData(
name=asin,
title=title,
url=product_url,
pricing_unit=price_symbol,
price=price,
feature_1=features[0] if len(features) > 0 else "n/a",
feature_2=features[1] if len(features) > 1 else "n/a",
feature_3=features[2] if len(features) > 2 else "n/a",
feature_4=features[3] if len(features) > 3 else "n/a",
images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a",
images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a",
images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a",
images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a"
)

product_pipeline.add_data(item_data)
product_pipeline.close_pipeline()

success = True

else:
raise Exception(f"Failed response from server, status code: {resp.status_code}")

except Exception as e:
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
return None


def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))
print(len(reader))

with ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader))




if __name__ == "__main__":

PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 20
MAX_THREADS = 3
LOCATION = "us"

for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)

for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)

The code above gives you a production ready Amazon scraper fully interated with the ScrapeOps Proxy API. To change your results, simply change your constants.

If you want detailed results on only one page of a search, change PAGES to 1. If you wish to run with 10 threads, change MAX_THREADS to 10... use caution with this one, each thread opens up another page in the proxy and ScrapeOps proxy does have a concurrency limit.


How To How To Architect Our Amazon Scraper

When we scrape Amazon, we need to scrape both search results and individual page data. When we search on Amazon, we get a bunch of pages and each page has a bunch of results.

Each item in our search also has its own page containing specific details about the item. You can get a better feel for these things if you take a look at the images below.

Results Page Amazon Search Results Page

The Results page holds most of the information we want to scrape such as the product name, sale price, real price, and rating.

Product Page Amazon Product Page The Product page holds much of the information we already find in the Result page and more. In our case specifically, the Product page holds bullet points describing the item and images of said item.

We find a phone we're interested in using the results. We learn more about that phone from our product page.


Understanding How To Scrape Amazon

Before plunging head first into code, we're going to talk about how our scraper works on a high level. In this section, we're going over the required steps in greater detail. If you've got some experience in web scraping already, feel free to skip this section.

Step 1: How To Request Amazon Pages

Let's take a better look at the URL from the page we looked at earlier.

Amazon Search Page URL

We are interested in this part of the URL:

https://www.amazon.com/s?k=phone
  • https://www.amazon.com/ is our base url.
  • s? shows that we're performing a search query.
  • k=phone tells the Amazon server that we want to look at phones.

Step 2: How To Extract Data From Amazon Pages

While some sites store their data conveniently in a JSON blob, Amazon does not. Amazon nests their data deeply within divs and spans. To extract our data, we need to pull it from these elements nested within the HTML.

Let's first take a look at the Results page. Below, you can see an item title with the inspect window open. If you look closely, you'll see the title text is nested within a span element.

Amazon Search Results Page Inspection

Now, let's take a look at the product page. Look closely here as well. Our feature bullets are actually span elements nested within li (list) elements.

Amazon Product Page Inspection

--

Step 3: How To Control Pagination

Controlling our pagination is very easy. It just requires an additional parameter to our URL. When pagination is added in, our URL will look like this:

https://www.amazon.com/s?k={product_name}&page={page_number}

So if we want to search page 1 of phones, this would be our URL:

https://www.amazon.com/s?k=phone&page=1

Step 4: Geolocated Data

Amazon does serve different content based on our location. If we're in the US, prices will be denoted in dollars, $. If we're in the UK, Amazon will give us our prices in the pound, GBP.

To control our location effectively, we'll be using the ScrapeOps Proxy API. The ScrapeOps API will route our traffic through servers in whichever country we ask for.

If we want to be in the UK, ScrapeOps will put us in the UK. If we want to be from the US, ScrapeOps will route us through servers in the US.


Setting Up Our Amazon Scraper Project

Now that we know what we want to do, let's get started on building our scraper. First we'll make a new project folder, and then we'll initialize a virtual environment and install dependencies.

Create a New Folder

mkdir amazon-scraper

From inside your new folder, create a new virtual environment.

Create a New Virtual Environment

python -m venv venv

Activate the Virtual Environment

source venv/bin/activate

Install Dependencies

pip install requests
pip install beautifulsoup4

Build A Amazon Search Crawler

The first portion of our project will be spent building a crawler to scrape Amazon search results. This crawler will actually be grabbing the bulk of our data. This crawler needs to:

  • parse results
  • manage result batches using pagination
  • store results from those pages
  • search multiple pages concurrently
  • integrate with a proxy for both location support and anti-bot resistance

Step 1: Create Simple Search Data Parser

Let's get started by creating a crawler that simply parses a Results page. Here is a scraper with a simple parsing function.

  • The parsing function below first finds all the div elements on the page.
  • Then it checks if each div is parsable.
  • If the div is parsable, we find the h2 element and strip out the whitespace and newlines.
  • If our stripped text doesn't hold a None value, we move on and extract the following from each listing:
    • asin
    • title
    • url
    • is_ad
    • pricing_unit
    • price
    • real_price
    • rating
import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API_KEY = "YOUR-SUPER-SECRET-API-KEY"



def search_products(product_name: str, retries=3):
tries = 0
success = False

while tries < retries and not success:
try:
url = f"https://www.amazon.com/s?k={product_name}"
resp = requests.get(url)

if resp.status_code == 200:
logger.info("Successfully fetched page")

soup = BeautifulSoup(resp.text, "html.parser")

divs = soup.find_all("div")

last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() != last_title and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")

rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
rating = float(rating_present)

price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0

real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price


if symbol_presence and rating_present and price_present:
product = {
"name": asin,
"title": title,
"url": product_url,
"is_ad": ad_status,
"pricing_unit": pricing_unit,
"price": price,
"real_price": real_price,
"rating": rating
}

print(product)


last_title = title
else:
continue
success = True

else:

raise Exception(f"Failed to scrape the page, Status Code {resp.status_code}, tries left: {retries-tries}")

except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1

if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")


print(f"Exited scrape_products for :{product_name}")




if __name__ == "__main__":

PRODUCTS = ["phone"]
MAX_RETRIES = 2


for product in PRODUCTS:
search_products(product, retries=MAX_RETRIES)

If you run this example, you'll get the following error. Amazon Blocked Terminal Message

Amazon will continue to block us because we appear abnormal. We'll address this later on in our scraper when we add proxy support.


Step 2: Add Pagination

Now that we can parse a page, let's add pagination into our parsing function. Pagination gives us the ability to control our result batches. If we want page 1, fetch page 1. If we want page 2, fetch page 2... and so on and so forth.

The code example below is almost exactly the same as before. The major difference: we have a page_number added to both our function arguments and our URL.

import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API_KEY = "YOUR-SUPER-SECRET-API-KEY"



def search_products(product_name: str, page_number=1, retries=3):
tries = 0
success = False

while tries < retries and not success:
try:
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
resp = requests.get(url)

if resp.status_code == 200:
logger.info("Successfully fetched page")

soup = BeautifulSoup(resp.text, "html.parser")

divs = soup.find_all("div")

last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() != last_title and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")

rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
rating = float(rating_present)

price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0

real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price


if symbol_presence and rating_present and price_present:
product = {
"name": asin,
"title": title,
"url": product_url,
"is_ad": ad_status,
"pricing_unit": pricing_unit,
"price": price,
"real_price": real_price,
"rating": rating
}

print(product)


last_title = title
else:
continue
success = True

else:

raise Exception(f"Failed to scrape the page, Status Code {resp.status_code}, tries left: {retries-tries}")

except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1

if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")


print(f"Exited scrape_products for :{product_name}")




if __name__ == "__main__":

PRODUCTS = ["phone"]
MAX_RETRIES = 2

for product in PRODUCTS:
search_products(product, retries=MAX_RETRIES)

As you can see above, not much has changed at all in our code. Our function now takes a page_number and inserts it into our url.


Step 3: Storing the Scraped Data

Now that our crawler can choose a page to scrape, it's time to give it the ability to store our data. In this section, we'll add a couple classes to do just that: ProductData and DataPipeline.

Here is our updated code example.

import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def search_products(product_name: str, page_number=1, retries=3, data_pipeline=None):
tries = 0
success = False

while tries < retries and not success:
try:
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
print(url)
resp = requests.get(url)

if resp.status_code == 200:
logger.info("Successfully fetched page")

soup = BeautifulSoup(resp.text, "html.parser")


bad_divs = soup.find_all("div", class_="AdHolder")


for bad_div in bad_divs:
bad_div.decompose()

divs = soup.find_all("div")

last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")

rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
print(rating_present)
print(title)
rating = float(rating_present)

price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0

real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price


if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)

data_pipeline.add_data(product)


last_title = title
else:
continue
success = True

else:

raise Exception(f"Failed to scrape the page, Status Code {resp.status_code}, tries left: {retries-tries}")

except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")


print(f"Exited scrape_products for :{product_name}")




if __name__ == "__main__":

PRODUCTS = ["phone"]
MAX_RETRIES = 2

for product in PRODUCTS:
product_pipeline = DataPipeline(csv_filename=f"{product}.csv")
search_products(product, retries=MAX_RETRIES, data_pipeline=product_pipeline)
product_pipeline.close_pipeline()

In the example above, we add our ProductData class to hold individual product data. We add a DataPipeline as well.

Our DataPipeline does all the heavy lifting of removing duplicates and saving our information to a CSV file.


Step 4: Adding Concurrency

When we added pagination earlier, we gave our crawler the ability to scrape different pages. Now that we can scrape a specific page and store its data, it's time to give our crawler the power to scrape a bunch of pages at once. With concurrency, we can do exactly that.

Here is our threaded_search() function.

def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")

pages = list(range(1, pages+1))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)

search_pipeline.close_pipeline()

We use ThreadPoolExecutor to manage our threads. This function will use 5 threads to perform our searches by default, so we'll have a maximum of 5 searches going simultaneously.

Here is our updated code. We also added a location argument to search_products(). While we don't use the location in this example, we'll be using it in the next section when we add proxy support.

import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False

while tries < retries and not success:
try:
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
print(url)
resp = requests.get(url)

if resp.status_code == 200:
logger.info("Successfully fetched page")

soup = BeautifulSoup(resp.text, "html.parser")


bad_divs = soup.find_all("div", class_="AdHolder")


for bad_div in bad_divs:
bad_div.decompose()

divs = soup.find_all("div")

last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")

rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
print(rating_present)
print(title)
rating = float(rating_present)

price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0

real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price


if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)

data_pipeline.add_data(product)


last_title = title
else:
continue
success = True

else:

raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}")

except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")


print(f"Exited scrape_products for :{product_name}")

def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")

pages = list(range(1, pages+1))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)

search_pipeline.close_pipeline()



if __name__ == "__main__":

PRODUCTS = ["phone"]
MAX_RETRIES = 2
PAGES = 5
MAX_THREADS = 3
LOCATION = "us"

for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)

If you run the code above, you'll still get blocked. To an Amazon server, our scraper already looked a bit abnormal. Now it's not only abnormal, it's exponentially faster than it was before. Let's add proxy support in the next section.


Step 5: Bypassing Anti-Bots

We're almost ready for our production run. It's time to add proxy support so Amazon stops blocking our crawler. We really only need to add one function here, get_scrapeops_url().

This function takes in a regular URL and uses basic string formatting to convert it into a URL that uses the ScrapeOps API. Take a look below:

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

This function takes our URL and formats it into a proxied URL. Here is our updated code below.

import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False

while tries < retries and not success:
try:
url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location)
print(url)
resp = requests.get(url)

if resp.status_code == 200:
logger.info("Successfully fetched page")

soup = BeautifulSoup(resp.text, "html.parser")


bad_divs = soup.find_all("div", class_="AdHolder")


for bad_div in bad_divs:
bad_div.decompose()

divs = soup.find_all("div")

last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")

rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
print(rating_present)
print(title)
rating = float(rating_present)

price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0

real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price


if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)

data_pipeline.add_data(product)


last_title = title
else:
continue
success = True

else:

raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}")

except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")


print(f"Exited scrape_products for :{product_name}")

def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")

pages = list(range(1, pages+1))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)

search_pipeline.close_pipeline()



if __name__ == "__main__":

PRODUCTS = ["phone"]
MAX_RETRIES = 2
PAGES = 5
MAX_THREADS = 3
LOCATION = "us"

for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)

Now that we can get past anti-bots, we're ready to move on to our production run.


Step 6: Production Run

Time for our production run. Take a look at our main function below.

if __name__ == "__main__":

PRODUCTS = ["phone"]
MAX_RETRIES = 2
PAGES = 10
MAX_THREADS = 3
LOCATION = "us"

for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)

You can change any of the following constants to change your results:

  • PRODUCTS
  • MAX_RETRIES
  • PAGES
  • MAX_THREADS
  • LOCATION

To run this scraper, replace the filename below with whatever you chose to name yours.

python crawler-proxy.py

Our final scraper generated a report on 10 pages full of phones in 26 seconds.

Amazon Crawler Results

Here is the report it created:

Amazon Crawler Report in CSV File


Build An Amazon Product Scraper

Now it's time to build a scraper that looks up individual products. From these individual product pages, we need to extract feature bullets, prices, and images. This way, if you're interested in a product, simply pull up your report for that product!

Step 1: Create Simple Amazon Product Page Data Parser

Here's a parsing function that retrieves data from a product page. We're not ready to add it into our scraper because we need the ability to read the CSV we created earlier.

def parse_product(product_object, location="us", retries=3):
url = product_object["url"]
tries = 0
success = False

product_url = f"https://www.amazon.com/{url}"

url_array = product_url.split("/")

title = url_array[-4]

asin = url_array[-2]

print("asin", asin, title)


while tries <= retries and not success:
try:
resp = requests.get(url)
if resp.status_code == 200:
print("Content Fetched")
soup = BeautifulSoup(resp.text, "html.parser")



#find all the images
spans = soup.find_all("span")

images_to_save = []

for span in spans:
image_array = span.find_all("span")

for item in image_array:
image_span = item.find("span")
if image_span is not None:
images = image_span.find_all("img")
for image in images:
image_link = image.get("src")
if "https://m.media-amazon.com/images/" in image_link not in images_to_save:
images_to_save.append(image_link)
features = []
feature_bullets = soup.find_all("li", class_="a-spacing-mini")
for feature in feature_bullets:
text = feature.find("span").text
if text not in features:
features.append(text)
price_symbol = soup.find("span", class_="a-price-symbol").text
whole_number = soup.find("span", class_="a-price-whole").text.replace(",", ".")
decimal = soup.find("span", class_="a-price-fraction").text

price = float(f"{whole_number}{decimal}")

item_data = {
"name": asin,
"title": title,
"url": product_url,
"pricing_unit": price_symbol,
"price": price,
"feature_1": features[0] if len(features) > 0 else "n/a",
"feature_2": features[1] if len(features) > 1 else "n/a",
"feature_3": features[2] if len(features) > 2 else "n/a",
"feature_4": features[3] if len(features) > 3 else "n/a",
"images_1": images_to_save[0] if len(images_to_save) > 0 else "n/a",
"images_2": images_to_save[1] if len(images_to_save) > 1 else "n/a",
"images_3": images_to_save[2] if len(images_to_save) > 2 else "n/a",
"images_4": images_to_save[3] if len(images_to_save) > 3 else "n/a"
}


print("Product Page Data:", item_data)

success = True

else:
raise Exception(f"Failed response from server, status code: {resp.status_code}")

except Exception as e:
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1

In the above function, we pull the features and item images from the product page. These will be used in the individual report we generate for each product.


Step 2: Loading URLs To Scrape

Now it's time to give our code the ability to run. In order to parse these items, we need to read them from a CSV file and then pass them into our parse function. The code example below adds a threaded_item_lookup() function.

At the moment, this function does not use threading. We just have a for loop as a placeholder. This function reads the CSV file and then passes each object from the file into parse_product().

import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False

while tries < retries and not success:
try:
url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location)
resp = requests.get(url)

if resp.status_code == 200:
logger.info("Successfully fetched page")

soup = BeautifulSoup(resp.text, "html.parser")


bad_divs = soup.find_all("div", class_="AdHolder")


for bad_div in bad_divs:
bad_div.decompose()

divs = soup.find_all("div")

last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")

rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
print(rating_present)
print(title)
rating = float(rating_present)

price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0

real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price


if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)

data_pipeline.add_data(product)


last_title = title
else:
continue
success = True

else:

raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}")

except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")


print(f"Exited scrape_products for :{product_name}")

def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")

pages = list(range(1, pages+1))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)

search_pipeline.close_pipeline()


def parse_product(product_object, location="us", retries=3):
url = product_object["url"]
tries = 0
success = False

product_url = f"https://www.amazon.com/{url}"

url_array = product_url.split("/")

title = url_array[-4]

asin = url_array[-2]

print("asin", asin, title)


while tries <= retries and not success:
try:
resp = requests.get(product_url)
if resp.status_code == 200:
print("Content Fetched")
soup = BeautifulSoup(resp.text, "html.parser")



#find all the images
spans = soup.find_all("span")

images_to_save = []

for span in spans:
image_array = span.find_all("span")

for item in image_array:
image_span = item.find("span")
if image_span is not None:
images = image_span.find_all("img")
for image in images:
image_link = image.get("src")
if "https://m.media-amazon.com/images/" in image_link not in images_to_save:
images_to_save.append(image_link)
features = []
feature_bullets = soup.find_all("li", class_="a-spacing-mini")
for feature in feature_bullets:
text = feature.find("span").text
if text not in features:
features.append(text)
price_symbol = soup.find("span", class_="a-price-symbol").text
whole_number = soup.find("span", class_="a-price-whole").text.replace(",", ".")
decimal = soup.find("span", class_="a-price-fraction").text

price = float(f"{whole_number}{decimal}")

item_data = {
"name": asin,
"title": title,
"url": product_url,
"pricing_unit": price_symbol,
"price": price,
"feature_1": features[0] if len(features) > 0 else "n/a",
"feature_2": features[1] if len(features) > 1 else "n/a",
"feature_3": features[2] if len(features) > 2 else "n/a",
"feature_4": features[3] if len(features) > 3 else "n/a",
"images_1": images_to_save[0] if len(images_to_save) > 0 else "n/a",
"images_2": images_to_save[1] if len(images_to_save) > 1 else "n/a",
"images_3": images_to_save[2] if len(images_to_save) > 2 else "n/a",
"images_4": images_to_save[3] if len(images_to_save) > 3 else "n/a"
}


print("Product Page Data:", item_data)

success = True

else:
raise Exception(f"Failed response from server, status code: {resp.status_code}")

except Exception as e:
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1



def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))

for product_object in reader:
parse_product(product_object, location=location, retries=retries)






if __name__ == "__main__":

PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 1
MAX_THREADS = 3
LOCATION = "us"

for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)

for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS)

You might be wondering why we use a separate field for each bullet point and image. The reason for this is actually pretty simple: our csv module has a very difficult time managing arrays and once we convert this item into a @dataclass, our fields won't be able to hold mutable size. In Python, arrays are mutable by default.


Step 3: Storing the Scraped Data

Similar to how we stored our data with the results crawler, we're going to be using a custom class to hold our data. We'll then pass this object into our DataPipeline to both filter and store our data. The code below adds a ProductPageData class and passes it into the pipeline.

import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ProductPageData:
name: str = ""
title: str = ""
url: str = "",
pricing_unit: str = "",
price: float = None,
feature_1: str = "",
feature_2: str = "",
feature_3: str = "",
feature_4: str = "",
images_1: str = "",
images_2: str = "",
images_3: str = "",
images_4: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False

while tries < retries and not success:
try:
url = get_scrapeops_url(f"https://www.amazon.com/s?k={product_name}&page={page_number}", location=location)
resp = requests.get(url)

if resp.status_code == 200:
logger.info("Successfully fetched page")

soup = BeautifulSoup(resp.text, "html.parser")


bad_divs = soup.find_all("div", class_="AdHolder")


for bad_div in bad_divs:
bad_div.decompose()

divs = soup.find_all("div")

last_title = ""
for div in divs:
parsable = True if div is not None else False
h2 = div.find("h2")
if h2 and h2.text.strip() and h2.text.strip() and parsable:
title = h2.text.strip()
a = h2.find("a")
product_url = a.get("href") if a else ""
ad_status = False
if "sspa" in product_url:
ad_status = True
asin = div.get("data-asin")
symbol_element = div.find("span", class_="a-price-symbol")
symbol_presence = symbol_element.text if symbol_element else None
if symbol_presence is not None:
pricing_unit = symbol_presence
prices = div.find_all("span", class_="a-offscreen")

rating_element = div.find("span", class_="a-icon-alt")
rating_present = rating_element.text[0:3] if rating_element else "0.0"
print(rating_present)
print(title)
rating = float(rating_present)

price_present = prices[0].text.replace(pricing_unit, "").replace(",", "") if prices else "0.0"
price = float(price_present) if price_present else 0.0

real_price = float(prices[1].text.replace(pricing_unit, "").replace(",", "")) if len(prices) > 1 else price


if symbol_presence and rating_present and price_present:
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)

data_pipeline.add_data(product)


last_title = title
else:
continue
success = True

else:

raise Exception(f"Failed to scrape the page {page_number}, Status Code {resp.status_code}, tries left: {retries-tries}")

except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")


print(f"Exited scrape_products for :{product_name}")

def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")

pages = list(range(1, pages+1))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)

search_pipeline.close_pipeline()


def parse_product(product_object, location="us", retries=3):
url = product_object["url"]
tries = 0
success = False

product_url = f"https://www.amazon.com/{url}"

url_array = product_url.split("/")

title = url_array[-4]

product_pipeline = DataPipeline(csv_filename=f"{title}.csv")

asin = url_array[-2]


while tries <= retries and not success:
try:
resp = requests.get(product_url)
if resp.status_code == 200:
soup = BeautifulSoup(resp.text, "html.parser")

#find all the images
spans = soup.find_all("span")

images_to_save = []

for span in spans:
image_array = span.find_all("span")

for item in image_array:
image_span = item.find("span")
if image_span is not None:
images = image_span.find_all("img")
for image in images:
image_link = image.get("src")
if "https://m.media-amazon.com/images/" in image_link not in images_to_save:
images_to_save.append(image_link)
features = []
feature_bullets = soup.find_all("li", class_="a-spacing-mini")
for feature in feature_bullets:
text = feature.find("span").text
if text not in features:
features.append(text)
price_symbol = soup.find("span", class_="a-price-symbol").text
whole_number = soup.find("span", class_="a-price-whole").text.replace(",", ".")
decimal = soup.find("span", class_="a-price-fraction").text

price = float(f"{whole_number}{decimal}")

item_data = ProductPageData(
name=asin,
title=title,
url=product_url,
pricing_unit=price_symbol,
price=price,
feature_1=features[0] if len(features) > 0 else "n/a",
feature_2=features[1] if len(features) > 1 else "n/a",
feature_3=features[2] if len(features) > 2 else "n/a",
feature_4=features[3] if len(features) > 3 else "n/a",
images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a",
images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a",
images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a",
images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a"
)

product_pipeline.add_data(item_data)
product_pipeline.close_pipeline()

success = True

else:
raise Exception(f"Failed response from server, status code: {resp.status_code}")

except Exception as e:
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1



def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))

for product_object in reader:
parse_product(product_object, location=location, retries=retries)






if __name__ == "__main__":

PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 1
MAX_THREADS = 3
LOCATION = "us"

for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)

for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)

You may have also notice that from inside parse_product(), we open up an individual pipeline for each product. This way, we generate an individual report for each one of the products we scraped earlier with the crawler.


Step 4: Adding Concurrency

Now, we're going to add concurrency so we can parse multiple products at once. This is very similar to when we added concurrency to the crawler earlier.

Take a look at the function below, it's the finished version of threaded_item_lookup():

def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))

with ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader))

Aside from the small changes in this function, everything else remains the same. In the next example, we'll add proxy support.


Step 5: Bypassing Anti-Bots

As you learned earlier in this article, Amazon will definitely block you if your scraper seems like it could be suspicious. Our scraper already looked abnormal, after adding concurrency, it looks really abnormal.

In this example, we're going to change one line of code and make the entire thing work.

resp = requests.get(get_scrapeops_url(product_url, location=location))

In parse_product() we simply convert our url in to a proxied one.

Here is the full code:

import requests
from bs4 import BeautifulSoup
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ProductPageData:
name: str = ""
title: str = ""
url: str = "",
pricing_unit: str = "",
price: float = None,
feature_1: str = ""