Skip to main content

Scrape Amazon With Selenium

How to Scrape Amazon With Selenium

Amazon is the largest online retailer in the world and one of the largest overall retailers in the world. If you're looking for anything online, you'll probably check Amazon first.

Amazon offers an unparalleled wealth of product data and consumer insights, providing numerous opportunities for analysis, market research, and strategic decision-making.

In this guide, we'll take you through how to scrape Amazon using Python Selenium.


TLDR - How to Scrape Amazon

If you are looking for a production-ready Amazon scraper, follow the script below:

from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ProductPageData:
name: str = ""
title: str = ""
url: str = "",
pricing_unit: str = "",
price: float = None,
feature_1: str = "",
feature_2: str = "",
feature_3: str = "",
feature_4: str = "",
images_1: str = "",
images_2: str = "",
images_3: str = "",
images_4: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False


while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
proxy_url = get_scrapeops_url(url, location)
driver.get(proxy_url)

logger.info("Successfully fetched page")


bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")

last_title = ""


for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)

divs = driver.find_elements(By.TAG_NAME, "div")

copied_divs = divs

last_title = ""
for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")

parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")

if h2 and parsable:
title = h2.text

if title == last_title:
continue

a = h2.find_element(By.TAG_NAME, "a")

product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")

ad_status = False
if "sspa" in product_url:
ad_status = True

url_array = product_url.split("/")
asin = url_array[5]

price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0

if not has_price:
continue

symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")

pricing_unit = symbol_element.text

price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")

price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")


price_str = f"{price_whole.text}.{price_decimal.text}"

rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")


price = float(price_str)

real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")


real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price

product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)

last_title = title

else:
continue
success = True

if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")


except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1

finally:
driver.quit()


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")


def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")

pages = list(range(1, pages+1))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)

search_pipeline.close_pipeline()


def parse_product(product_object, location="us", retries=3):


product_url = product_object["url"]


proxy_url = get_scrapeops_url(product_url, location=location)

tries = 0
success = False


url_array = product_url.split("/")

title = url_array[-4]

print(title)

product_pipeline = DataPipeline(csv_filename=f"{title}.csv")

asin = url_array[-2]


while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(proxy_url)

images_to_save = []
features = []


images = driver.find_elements(By.CSS_SELECTOR, "li img")
for image in images:
image_link = image.get_attribute("src")
if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save:
images_to_save.append(image_link)
feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini")
for feature in feature_bullets:
text = feature.find_element(By.TAG_NAME, "span").text
if text not in features:
features.append(text)
price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text
whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "")
decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text

price = float(f"{whole_number}.{decimal}")


if len(images_to_save) > 0 and len(features) > 0:
item_data = ProductPageData(
name=asin,
title=title,
url=product_url,
pricing_unit=price_symbol,
price=price,
feature_1=features[0] if len(features) > 0 else "n/a",
feature_2=features[1] if len(features) > 1 else "n/a",
feature_3=features[2] if len(features) > 2 else "n/a",
feature_4=features[3] if len(features) > 3 else "n/a",
images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a",
images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a",
images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a",
images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a"
)

product_pipeline.add_data(item_data)
product_pipeline.close_pipeline()
success = True
except Exception as e:
driver.save_screenshot("PARSE_ERROR.png")
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
finally:
driver.quit()
return None


def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))

with ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader))




if __name__ == "__main__":

PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 1
MAX_THREADS = 3
LOCATION = "us"

for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)

for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)

The code above gives you a production ready Selenium scraper for Amazon... fully integrated with the ScrapeOps Proxy API!

  • To change your results, simply change your constants.
  • If you want detailed results on only one page of a search, change PAGES to 1.
  • If you wish to run with 10 threads, change MAX_THREADS to 10... use caution with this one, each thread opens up another page in the proxy and ScrapeOps proxy does have a concurrency limit.

How To How To Architect Our Amazon Scraper

When we scrape Amazon, we need to pull valuable data from both our search results and individual item pages. When we search on Amazon, we get a bunch of pages and each page has a bunch of results.

Each item in our search also has its own page containing specific details about the item. You can get a better feel for these things if you take a look at the images below.

Results Page Amazon Search Results Page

Our Results page holds most of the information we want to scrape such as the product name, sale price, real price, and rating.

Product Page Amazon Product Page The Product page holds much of the information we already find in the Result page and more. In our case specifically, we need the Product page because it holds bullet points and images specific to the item we're looking at.

When we review our results, we find phones we're interested in. When we want to look at details for a specific phone, we look at the page for that phone.


Understanding How To Scrape Amazon

Before plunging head first into code, we're going to talk about how our scraper works on a high level. In this section, we're going over the required steps in greater detail. If you've got some experience in web scraping already, feel free to skip this section.

Step 1: How To Request Amazon Pages

Let's take a better look at the URL from the page we looked at earlier.

alt text

https://www.amazon.com/s?k=phone is the portion you really need to pay attention to.

  • https://www.amazon.com/ is our base URL.
  • s? shows that we're performing a search query.
  • k=phone tells the Amazon server that we want to look at phones.

Their server takes all this information from the URL and sends us back a page of phones.


Step 2: How To Extract Data From Amazon Pages

While some sites store their data conveniently in a JSON blob, Amazon does not. Amazon nests their data deeply within divs and spans. To extract our data, we need to pull it from these elements nested within the HTML.

Let's first take a look at the Results page. Below, you can see an item title with the inspect window open. If you look closely, you'll see the title text is nested within a span element.

Inspect Results Amazon Search Results

Now, let's take a look at the product page. Look closely here as well. Our feature bullets are actually span elements nested within li (list) elements.

Inspect Product Page


Step 3: How To Control Pagination

Controlling is a pretty simple task. It just requires an additional parameter to our URL.

When pagination is added in, our URL will look like this:

https://www.amazon.com/s?k={product_name}&page={page_number}

So if we want to search page 1 of phones, this would be our URL:

https://www.amazon.com/s?k=phone&page=1

Step 4: Geolocated Data

Amazon does serve different content based on our location. If we're in the US, prices will be denoted in dollars, $. If we're in the UK, Amazon will give us our prices in the pound, GBP.

To control our location effectively, we'll be using the ScrapeOps Proxy API. The ScrapeOps API will route our traffic through servers in whichever country we ask for.

If we want to be in the UK, ScrapeOps will put us in the UK. If we want to be from the US, ScrapeOps will route us through servers in the US.

The ScrapeOps API is a perfect way to control your location because our requests are actually routed through the location we want.


Setting Up Our Amazon Scraper Project

Now that we know what we want to do, let's start building our scraper. First, we'll make a new project folder, and then we'll initialize a virtual environment and install dependencies.

Create a New Folder

mkdir amazon-scraper

From inside your new folder, create a new virtual environment.

Create a New Virtual Environment

python -m venv venv

Activate the Virtual Environment

source venv/bin/activate

Install Dependencies

pip install selenium

Make sure you have Chromedriver installed. You can find the latest version here. If you are using version 115 or higher, installations are much more manageable.


Build A Amazon Search Crawler

The first portion of our project will be spent building a crawler to scrape Amazon search results. This crawler will actually be grabbing the bulk of our data. This crawler needs to:

  • parse results
  • manage result batches using pagination
  • store results from those pages
  • search multiple pages concurrently
  • integrate with a proxy for both location support and anti-bot resistance

Our ideal crawler will fetch a page. It will parse the information from the page to give us good results. Then it'll store those results in files for us to look at later. On top of all these things, it needs to use concurrency for speed and efficiency and it also needs to use a proxy so we don't get blocked.


Step 1: Create Simple Search Data Parser

Let's get started by creating a crawler that simply parses a Results page.

Here is a scraper with a simple parsing function.

  • The parsing function below first finds all the div elements on the page.
  • Then it checks if each div is parsable.
  • If the div is parsable, we use its text as our title.
  • We then and extract the following from each listing:
    • asin
    • title
    • url
    • is_ad
    • pricing_unit
    • price
    • real_price
    • rating
from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = "YOUR-SUPER-SECRET-API-KEY"

def search_products(product_name: str, retries=3):
tries = 0
success = False

while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}"
driver.get(url)

logger.info("Successfully fetched page")

#remove the bad divs
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")

for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
#find the regular divs
divs = driver.find_elements(By.TAG_NAME, "div")
#copy them to help with stale elements
copied_divs = divs
last_title = ""

for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")

parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")

if h2 and parsable:
title = h2.text

if title == last_title:
continue

a = h2.find_element(By.TAG_NAME, "a")

product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")

ad_status = False
if "sspa" in product_url:
ad_status = True

url_array = product_url.split("/")
asin = url_array[5]

price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0

if not has_price:
continue

symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")

pricing_unit = symbol_element.text

price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}"

rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")


price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")


real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price

product = {
"name": asin,
"title": title,
"url": product_url,
"is_ad": ad_status,
"pricing_unit": pricing_unit,
"price": price,
"real_price": real_price,
"rating": rating
}

print(product)

last_title = title

else:
continue
success = True

if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")


except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1

finally:
driver.quit()


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")



if __name__ == "__main__":

PRODUCTS = ["phone"]
MAX_RETRIES = 2

for product in PRODUCTS:
search_products(product)

If you run this example, you'll probably get blocked.

Amazon will likely continue to block us because we appear abnormal. We'll address this later on in our scraper when we add proxy support.


Step 2: Add Pagination

Now that we can parse a page, let's add pagination into our parsing function. Pagination gives us the ability to control our result batches. If we want page 1, fetch page 1. If we want page 2, fetch page 2... and so on and so forth.

The code example below is almost exactly the same as before. The major difference: we have a page_number added to both our function arguments and our url.

from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


def search_products(product_name: str, page_number=1, retries=3):
tries = 0
success = False

while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
driver.get(url)

logger.info("Successfully fetched page")

#remove the bad divs
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")

for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
#find the regular divs
divs = driver.find_elements(By.TAG_NAME, "div")
#copy them to help with stale elements
copied_divs = divs
last_title = ""

for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")

parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")

if h2 and parsable:
title = h2.text

if title == last_title:
continue

a = h2.find_element(By.TAG_NAME, "a")

product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")

ad_status = False
if "sspa" in product_url:
ad_status = True

url_array = product_url.split("/")
asin = url_array[5]

price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0

if not has_price:
continue

symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")

pricing_unit = symbol_element.text

price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}"

rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")


price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")


real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price

product = {
"name": asin,
"title": title,
"url": product_url,
"is_ad": ad_status,
"pricing_unit": pricing_unit,
"price": price,
"real_price": real_price,
"rating": rating
}

print(product)

last_title = title

else:
continue
success = True

if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")


except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1

finally:
driver.quit()


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")



if __name__ == "__main__":

PRODUCTS = ["phone"]
MAX_RETRIES = 2
PAGE = 2

for product in PRODUCTS:
search_products(product, page_number=PAGE)

As you can see above, not much has changed at all in our code. Our function now takes a page_number and inserts it into our url.


Step 3: Storing the Scraped Data

Now that our crawler can choose a page to scrape, it's time to give it the ability to store our data.

In this section, we'll add a couple classes to do just that: ProductData and DataPipeline.

  • ProductData simply holds information from the objects we scrape.
  • DataPipeline does the job of filtering out duplicates and safely storing our data.

Here is our updated code example.

from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def search_products(product_name: str, page_number=1, retries=3, data_pipeline=None):
tries = 0
success = False

while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
driver.get(url)

logger.info("Successfully fetched page")

#remove the bad divs
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")

for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
#find the regular divs
divs = driver.find_elements(By.TAG_NAME, "div")
#copy them to help with stale elements
copied_divs = divs
last_title = ""

for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")

parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")

if h2 and parsable:
title = h2.text

if title == last_title:
continue

a = h2.find_element(By.TAG_NAME, "a")

product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")

ad_status = False
if "sspa" in product_url:
ad_status = True

url_array = product_url.split("/")
asin = url_array[5]

price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0

if not has_price:
continue

symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")

pricing_unit = symbol_element.text

price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}"

rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")


price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")


real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price

product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)

data_pipeline.add_data(product)

last_title = title

else:
continue
success = True

if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")


except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1

finally:
driver.quit()


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")




if __name__ == "__main__":

PRODUCTS = ["phone"]
MAX_RETRIES = 2

for product in PRODUCTS:
product_pipeline = DataPipeline(csv_filename=f"{product}.csv")
search_products(product, retries=MAX_RETRIES, data_pipeline=product_pipeline)
product_pipeline.close_pipeline()

In the example above, we add our ProductData class to hold individual product data. We add a DataPipeline as well.

Our DataPipeline does all the heavy lifting of removing duplicates and saving our information to a CSV file.


Step 4: Adding Concurrency

When we added pagination earlier, we gave our crawler the ability to scrape different pages. Now that we can scrape a specific page and store its data, it's time to give our crawler the power to scrape a bunch of pages at once. With concurrency, we can do exactly that.

Here is our threaded_search() function.

def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")

pages = list(range(1, pages+1))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)

search_pipeline.close_pipeline()

We use ThreadPoolExecutor to manage our threads. This function will use 5 threads by default when performing searches, so we'll have a maximum of 5 searches going simultaneously. Be mindful when choosing how many threads to use. Not only does your machine have limits, but your ScrapeOps API key will likely also have a concurrency limit. You don't want to run threads past your limit... you'd just be wasting resources!

Here is our updated code. We also added a location argument to search_products(). While we don't use the location in this example, we'll be using it in the next section when we add proxy support.

from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False

while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
driver.get(url)

logger.info("Successfully fetched page")

#remove the bad divs
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")

for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
#find the regular divs
divs = driver.find_elements(By.TAG_NAME, "div")
#copy them to help with stale elements
copied_divs = divs
last_title = ""

for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")

parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")

if h2 and parsable:
title = h2.text

if title == last_title:
continue

a = h2.find_element(By.TAG_NAME, "a")

product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")

ad_status = False
if "sspa" in product_url:
ad_status = True

url_array = product_url.split("/")
asin = url_array[5]

price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0

if not has_price:
continue

symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")

pricing_unit = symbol_element.text

price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}"

rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")


price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")


real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price

product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)

data_pipeline.add_data(product)

last_title = title

else:
continue
success = True

if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")


except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1

finally:
driver.quit()


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")

def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")

pages = list(range(1, pages+1))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)

search_pipeline.close_pipeline()




if __name__ == "__main__":

PRODUCTS = ["phone"]
MAX_RETRIES = 2
PAGES = 2
MAX_THREADS = 3
LOCATION = "us"

for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"

We're almost ready, but not quite. If you run the code above, you'll still most likely get blocked. To an Amazon server, our scraper already looked a bit abnormal. Now it's not only abnormal, it's exponentially faster than it was before. Let's add proxy support in the next section.


Step 5: Bypassing Anti-Bots

We're almost ready for our production run. It's time to add proxy support so Amazon stops blocking our crawler. We really only need to add one function here, get_scrapeops_url().

This function takes in a regular URL and uses basic string formatting to convert it into a URL that uses the ScrapeOps API. Take a look below:

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

This function takes our url and formats it into a proxied url. Here is our updated code below.

from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = "YOUR-SUPER-SECRET-API-KEY"

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False

while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
driver.get(get_scrapeops_url(url))

logger.info("Successfully fetched page")

#remove the bad divs
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")

for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
#find the regular divs
divs = driver.find_elements(By.TAG_NAME, "div")
#copy them to help with stale elements
copied_divs = divs
last_title = ""

for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")

parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")

if h2 and parsable:
title = h2.text

if title == last_title:
continue

a = h2.find_element(By.TAG_NAME, "a")

product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")

ad_status = False
if "sspa" in product_url:
ad_status = True

url_array = product_url.split("/")
asin = url_array[5]

price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0

if not has_price:
continue

symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")

pricing_unit = symbol_element.text

price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}".replace(",", "")

rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")


price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")


real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "").replace(",", "")
real_price = float(real_price_str)
else:
real_price = price

product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)

data_pipeline.add_data(product)

last_title = title

else:
continue
success = True

if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")


except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1

finally:
driver.quit()


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")

def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")

pages = list(range(1, pages+1))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)

search_pipeline.close_pipeline()


if __name__ == "__main__":

PRODUCTS = ["phone"]
MAX_RETRIES = 2
PAGES = 2
MAX_THREADS = 3
LOCATION = "us"

for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"

Now that we can get past anti-bots, we're ready to move on to our production run.


Step 6: Production Run

Time for our production run. Take a look at our main function below.

if __name__ == "__main__":

PRODUCTS = ["phone"]
MAX_RETRIES = 4
PAGES = 3
MAX_THREADS = 3
LOCATION = "us"

for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"

You can change any of the following constants to change your results:

  • PRODUCTS
  • MAX_RETRIES
  • PAGES
  • MAX_THREADS
  • LOCATION

To run this scraper, replace the filename below with whatever you chose to name yours.

python crawler-proxy.py

Our final crawler generated a report on 3 pages full of phones in 52.5 seconds. When running in production, be cautious of your MAX_THREADS.

Selenium can be vulernable to both thread locking and "stale elements". If you are noticing stale element errors, decrease your MAX_THREADS. Each thread is running its own browser and this can get resource intensive.

Crawler Performance Results

Here is the report it created:

Crawler Report as CSV File


Build An Amazon Product Scraper

Now it's time to build a scraper that looks up individual products. From these individual product pages, we need to extract feature bullets, prices, and images. This way, if you're interested in a product, simply pull up your report for that product!


Step 1: Create Simple Amazon Product Page Data Parser

Here's a parsing function that retrieves data from a product page. We're not ready to add it into our scraper because we need the ability to read the CSV we created earlier.

def parse_product(product_object, location="us", retries=3):

product_url = product_object["url"]

tries = 0
success = False


url_array = product_url.split("/")

title = url_array[-4]

asin = url_array[-2]


while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(product_url)

images_to_save = []
features = []


images = driver.find_elements(By.CSS_SELECTOR, "li img")
for image in images:
image_link = image.get_attribute("src")
if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save:
images_to_save.append(image_link)
feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini")
for feature in feature_bullets:
text = feature.find_element(By.TAG_NAME, "span").text
if text not in features:
features.append(text)
price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text
whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "")
decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text

price = float(f"{whole_number}.{decimal}")


if len(images_to_save) > 0 and len(features) > 0:
item_data = {
"name": asin,
"title": title,
"url": product_url,
"pricing_unit": price_symbol,
"price": price,
"feature_1": features[0] if len(features) > 0 else "n/a",
"feature_2": features[1] if len(features) > 1 else "n/a",
"feature_3": features[2] if len(features) > 2 else "n/a",
"feature_4": features[3] if len(features) > 3 else "n/a",
"images_1": images_to_save[0] if len(images_to_save) > 0 else "n/a",
"images_2": images_to_save[1] if len(images_to_save) > 1 else "n/a",
"images_3": images_to_save[2] if len(images_to_save) > 2 else "n/a",
"images_4": images_to_save[3] if len(images_to_save) > 3 else "n/a"
}

print(item_data)

success = True
except Exception as e:
driver.save_screenshot("PARSE_ERROR.png")
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
finally:
driver.quit()
return None

In the above function, we pull the features and item images from the product page. These will be used in the individual report we generate for each product.


Step 2: Loading URLs To Scrape

Now it's time to give our code the ability to run. In order to parse these items, we need to read them from a CSV file and then pass them into our parse function. The code example below adds a threaded_item_lookup() function.

At the moment, this function does not use threading. We just have a for loop as a placeholder.

This function reads the CSV file and then passes each object from the file into parse_product().

from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False


while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
driver.get(url)

logger.info("Successfully fetched page")


bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")

last_title = ""


for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)

divs = driver.find_elements(By.TAG_NAME, "div")

copied_divs = divs

last_title = ""
for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")

parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")

if h2 and parsable:
title = h2.text

if title == last_title:
continue

a = h2.find_element(By.TAG_NAME, "a")

product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")

ad_status = False
if "sspa" in product_url:
ad_status = True

url_array = product_url.split("/")
asin = url_array[5]

price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0

if not has_price:
continue

symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")

pricing_unit = symbol_element.text

price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")

price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")


price_str = f"{price_whole.text}.{price_decimal.text}"

rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")


price = float(price_str)

real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")


real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price

product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)

last_title = title

else:
continue
success = True

if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")


except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1

finally:
driver.quit()


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")


def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")

pages = list(range(1, pages+1))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)

search_pipeline.close_pipeline()


def parse_product(product_object, location="us", retries=3):

product_url = product_object["url"]

tries = 0
success = False


url_array = product_url.split("/")

title = url_array[-4]

asin = url_array[-2]


while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(product_url)

images_to_save = []
features = []


images = driver.find_elements(By.CSS_SELECTOR, "li img")
for image in images:
image_link = image.get_attribute("src")
if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save:
images_to_save.append(image_link)
feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini")
for feature in feature_bullets:
text = feature.find_element(By.TAG_NAME, "span").text
if text not in features:
features.append(text)
price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text
whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "")
decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text

price = float(f"{whole_number}.{decimal}")


if len(images_to_save) > 0 and len(features) > 0:
item_data = {
"name": asin,
"title": title,
"url": product_url,
"pricing_unit": price_symbol,
"price": price,
"feature_1": features[0] if len(features) > 0 else "n/a",
"feature_2": features[1] if len(features) > 1 else "n/a",
"feature_3": features[2] if len(features) > 2 else "n/a",
"feature_4": features[3] if len(features) > 3 else "n/a",
"images_1": images_to_save[0] if len(images_to_save) > 0 else "n/a",
"images_2": images_to_save[1] if len(images_to_save) > 1 else "n/a",
"images_3": images_to_save[2] if len(images_to_save) > 2 else "n/a",
"images_4": images_to_save[3] if len(images_to_save) > 3 else "n/a"
}

print(item_data)

success = True
except Exception as e:
driver.save_screenshot("PARSE_ERROR.png")
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
finally:
driver.quit()
return None


def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))

for row in reader:
parse_product(row)


if __name__ == "__main__":

PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 1
MAX_THREADS = 3
LOCATION = "us"

for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)

for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)

You might be wondering why we use a separate field for each bullet point and image. The reason for this is actually pretty simple: our csv module has a very difficult time managing arrays and once we convert this item into a @dataclass, our fields won't be able to hold mutable size. In Python, arrays are mutable by default.


Step 3: Storing the Scraped Data

Similar to how we stored our data with the results crawler, we're going to be using a custom class to hold our data. We'll then pass this object into our DataPipeline to both filter and store our data.

The code below adds a ProductPageData class and passes it into our new pipeline for safe storage.

from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = "YOUR-SUPER-SECRET-API-KEY"


@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ProductPageData:
name: str = ""
title: str = ""
url: str = "",
pricing_unit: str = "",
price: float = None,
feature_1: str = "",
feature_2: str = "",
feature_3: str = "",
feature_4: str = "",
images_1: str = "",
images_2: str = "",
images_3: str = "",
images_4: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False


while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
driver.get(url)

logger.info("Successfully fetched page")


bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")

last_title = ""


for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)

divs = driver.find_elements(By.TAG_NAME, "div")

copied_divs = divs

last_title = ""
for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")

parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")

if h2 and parsable:
title = h2.text

if title == last_title:
continue

a = h2.find_element(By.TAG_NAME, "a")

product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")

ad_status = False
if "sspa" in product_url:
ad_status = True

url_array = product_url.split("/")
asin = url_array[5]

price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0

if not has_price:
continue

symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")

pricing_unit = symbol_element.text

price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")

price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")


price_str = f"{price_whole.text}.{price_decimal.text}"

rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")


price = float(price_str)

real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")


real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price

product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)

last_title = title

else:
continue
success = True

if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")


except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1

finally:
driver.quit()


if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")


def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")

pages = list(range(1, pages+1))

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)

search_pipeline.close_pipeline()


def parse_product(product_object, location="us", retries=3):


product_url = product_object["url"]

tries = 0
success = False


url_array = product_url.split("/")

title = url_array[-4]

print(title)

product_pipeline = DataPipeline(csv_filename=f"{title}.csv")

asin = url_array[-2]


while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(product_url)

images_to_save = []
features = []


images = driver.find_elements(By.CSS_SELECTOR, "li img")
for image in images:
image_link = image.get_attribute("src")
if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save:
images_to_save.append(image_link)
feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini")
for feature in feature_bullets:
text = feature.find_element(By.TAG_NAME, "span").text
if text not in features:
features.append(text)
price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text
whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "")
decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text

price = float(f"{whole_number}.{decimal}")


if len(images_to_save) > 0 and len(features) > 0:
item_data = ProductPageData(
name=asin,
title=title,
url=product_url,
pricing_unit=price_symbol,
price=price,
feature_1=features[0] if len(features) > 0 else "n/a",
feature_2=features[1] if len(features) > 1 else "n/a",
feature_3=features[2] if len(features) > 2 else "n/a",
feature_4=features[3] if len(features) > 3 else "n/a",
images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a",
images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a",
images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a",
images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a"
)

product_pipeline.add_data(item_data)
product_pipeline.close_pipeline()
success = True
except Exception as e:
driver.save_screenshot("PARSE_ERROR.png")
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
finally:
driver.quit()
return None


def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))

for row in reader:
parse_product(row)




if __name__ == "__main__":

PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 1
MAX_THREADS = 3
LOCATION = "us"

for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)

for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)

You may have also notice that from inside parse_product(), we open up an individual pipeline for each product.

This way, we generate an individual report for each one of the products we scraped earlier with the crawler. If you want to see details about a specific item, you can just open the report for that item!!!


Step 4: Adding Concurrency

Now, we're going to add concurrency so we can parse multiple products at once. This is very similar to when we added concurrency to the crawler earlier.

Take a look at the function below, it's the finished version of threaded_item_lookup():

def threaded_item_lookup(csv_filename, location="us", retries=3