How to Scrape Amazon With Selenium
Amazon is the largest online retailer in the world and one of the largest overall retailers in the world. If you're looking for anything online, you'll probably check Amazon first.
Amazon offers an unparalleled wealth of product data and consumer insights, providing numerous opportunities for analysis, market research, and strategic decision-making.
In this guide, we'll take you through how to scrape Amazon using Python Selenium.
- TLDR - How to Scrape Amazon
- How To Architect Our Scraper
- Understanding How To Scrape Amazon
- Setting Up Our Amazon Scraper
- Build a Search Results Crawler
- Build a Product Parser
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
TLDR - How to Scrape Amazon
If you are looking for a production-ready Amazon scraper, follow the script below:
from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: float = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ProductPageData:
name: str = ""
title: str = ""
url: str = "",
pricing_unit: str = "",
price: float = None,
feature_1: str = "",
feature_2: str = "",
feature_3: str = "",
feature_4: str = "",
images_1: str = "",
images_2: str = "",
images_3: str = "",
images_4: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
proxy_url = get_scrapeops_url(url, location)
driver.get(proxy_url)
logger.info("Successfully fetched page")
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")
last_title = ""
for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
divs = driver.find_elements(By.TAG_NAME, "div")
copied_divs = divs
last_title = ""
for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")
parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")
if h2 and parsable:
title = h2.text
if title == last_title:
continue
a = h2.find_element(By.TAG_NAME, "a")
product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")
ad_status = False
if "sspa" in product_url:
ad_status = True
url_array = product_url.split("/")
asin = url_array[5]
price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0
if not has_price:
continue
symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")
pricing_unit = symbol_element.text
price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}"
rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")
price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")
real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
finally:
driver.quit()
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
def parse_product(product_object, location="us", retries=3):
product_url = product_object["url"]
proxy_url = get_scrapeops_url(product_url, location=location)
tries = 0
success = False
url_array = product_url.split("/")
title = url_array[-4]
print(title)
product_pipeline = DataPipeline(csv_filename=f"{title}.csv")
asin = url_array[-2]
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(proxy_url)
images_to_save = []
features = []
images = driver.find_elements(By.CSS_SELECTOR, "li img")
for image in images:
image_link = image.get_attribute("src")
if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save:
images_to_save.append(image_link)
feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini")
for feature in feature_bullets:
text = feature.find_element(By.TAG_NAME, "span").text
if text not in features:
features.append(text)
price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text
whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "")
decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text
price = float(f"{whole_number}.{decimal}")
if len(images_to_save) > 0 and len(features) > 0:
item_data = ProductPageData(
name=asin,
title=title,
url=product_url,
pricing_unit=price_symbol,
price=price,
feature_1=features[0] if len(features) > 0 else "n/a",
feature_2=features[1] if len(features) > 1 else "n/a",
feature_3=features[2] if len(features) > 2 else "n/a",
feature_4=features[3] if len(features) > 3 else "n/a",
images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a",
images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a",
images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a",
images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a"
)
product_pipeline.add_data(item_data)
product_pipeline.close_pipeline()
success = True
except Exception as e:
driver.save_screenshot("PARSE_ERROR.png")
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
finally:
driver.quit()
return None
def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))
with ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(parse_product, reader, [location] * len(reader), [retries] * len(reader))
if __name__ == "__main__":
PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 1
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)
for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
The code above gives you a production ready Selenium scraper for Amazon... fully integrated with the ScrapeOps Proxy API!
- To change your results, simply change your constants.
- If you want detailed results on only one page of a search, change
PAGES
to 1. - If you wish to run with 10 threads, change
MAX_THREADS
to 10... use caution with this one, each thread opens up another page in the proxy and ScrapeOps proxy does have a concurrency limit.
How To How To Architect Our Amazon Scraper
When we scrape Amazon, we need to pull valuable data from both our search results and individual item pages. When we search on Amazon, we get a bunch of pages and each page has a bunch of results.
Each item in our search also has its own page containing specific details about the item. You can get a better feel for these things if you take a look at the images below.
Results Page
Our Results page holds most of the information we want to scrape such as the product name, sale price, real price, and rating.
Product Page The Product page holds much of the information we already find in the Result page and more. In our case specifically, we need the Product page because it holds bullet points and images specific to the item we're looking at.
When we review our results, we find phones we're interested in. When we want to look at details for a specific phone, we look at the page for that phone.
Understanding How To Scrape Amazon
Before plunging head first into code, we're going to talk about how our scraper works on a high level. In this section, we're going over the required steps in greater detail. If you've got some experience in web scraping already, feel free to skip this section.
Step 1: How To Request Amazon Pages
Let's take a better look at the URL from the page we looked at earlier.
https://www.amazon.com/s?k=phone
is the portion you really need to pay attention to.
https://www.amazon.com/
is our base URL.s?
shows that we're performing a search query.k=phone
tells the Amazon server that we want to look at phones.
Their server takes all this information from the URL and sends us back a page of phones.
Step 2: How To Extract Data From Amazon Pages
While some sites store their data conveniently in a JSON blob, Amazon does not. Amazon nests their data deeply within divs and spans. To extract our data, we need to pull it from these elements nested within the HTML.
Let's first take a look at the Results page. Below, you can see an item title with the inspect window open. If you look closely, you'll see the title text is nested within a span
element.
Now, let's take a look at the product page. Look closely here as well. Our feature bullets are actually span
elements nested within li
(list) elements.
Step 3: How To Control Pagination
Controlling is a pretty simple task. It just requires an additional parameter to our URL.
When pagination is added in, our URL will look like this:
https://www.amazon.com/s?k={product_name}&page={page_number}
So if we want to search page 1
of phones, this would be our URL:
https://www.amazon.com/s?k=phone&page=1
Step 4: Geolocated Data
Amazon does serve different content based on our location. If we're in the US, prices will be denoted in dollars, $
. If we're in the UK, Amazon will give us our prices in the pound, GBP
.
To control our location effectively, we'll be using the ScrapeOps Proxy API. The ScrapeOps API will route our traffic through servers in whichever country we ask for.
If we want to be in the UK, ScrapeOps will put us in the UK. If we want to be from the US, ScrapeOps will route us through servers in the US.
The ScrapeOps API is a perfect way to control your location because our requests are actually routed through the location we want.
Setting Up Our Amazon Scraper Project
Now that we know what we want to do, let's start building our scraper. First, we'll make a new project folder, and then we'll initialize a virtual environment and install dependencies.
Create a New Folder
mkdir amazon-scraper
From inside your new folder, create a new virtual environment.
Create a New Virtual Environment
python -m venv venv
Activate the Virtual Environment
source venv/bin/activate
Install Dependencies
pip install selenium
Make sure you have Chromedriver installed. You can find the latest version here. If you are using version 115 or higher, installations are much more manageable.
Build A Amazon Search Crawler
The first portion of our project will be spent building a crawler to scrape Amazon search results. This crawler will actually be grabbing the bulk of our data. This crawler needs to:
- parse results
- manage result batches using pagination
- store results from those pages
- search multiple pages concurrently
- integrate with a proxy for both location support and anti-bot resistance
Our ideal crawler will fetch a page. It will parse the information from the page to give us good results. Then it'll store those results in files for us to look at later. On top of all these things, it needs to use concurrency for speed and efficiency and it also needs to use a proxy so we don't get blocked.
Step 1: Create Simple Search Data Parser
Let's get started by creating a crawler that simply parses a Results page.
Here is a scraper with a simple parsing function.
- The parsing function below first finds all the
div
elements on the page. - Then it checks if each
div
isparsable
. - If the
div
is parsable, we use itstext
as ourtitle
. - We then and extract the following from each listing:
asin
title
url
is_ad
pricing_unit
price
real_price
rating
from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
def search_products(product_name: str, retries=3):
tries = 0
success = False
while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}"
driver.get(url)
logger.info("Successfully fetched page")
#remove the bad divs
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")
for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
#find the regular divs
divs = driver.find_elements(By.TAG_NAME, "div")
#copy them to help with stale elements
copied_divs = divs
last_title = ""
for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")
parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")
if h2 and parsable:
title = h2.text
if title == last_title:
continue
a = h2.find_element(By.TAG_NAME, "a")
product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")
ad_status = False
if "sspa" in product_url:
ad_status = True
url_array = product_url.split("/")
asin = url_array[5]
price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0
if not has_price:
continue
symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")
pricing_unit = symbol_element.text
price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}"
rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")
price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")
real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price
product = {
"name": asin,
"title": title,
"url": product_url,
"is_ad": ad_status,
"pricing_unit": pricing_unit,
"price": price,
"real_price": real_price,
"rating": rating
}
print(product)
last_title = title
else:
continue
success = True
if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
finally:
driver.quit()
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
if __name__ == "__main__":
PRODUCTS = ["phone"]
MAX_RETRIES = 2
for product in PRODUCTS:
search_products(product)
If you run this example, you'll probably get blocked.
Amazon will likely continue to block us because we appear abnormal. We'll address this later on in our scraper when we add proxy support.
Step 2: Add Pagination
Now that we can parse a page, let's add pagination into our parsing function. Pagination gives us the ability to control our result batches. If we want page 1, fetch page 1. If we want page 2, fetch page 2... and so on and so forth.
The code example below is almost exactly the same as before. The major difference: we have a page_number
added to both our function arguments and our url.
from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
def search_products(product_name: str, page_number=1, retries=3):
tries = 0
success = False
while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
driver.get(url)
logger.info("Successfully fetched page")
#remove the bad divs
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")
for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
#find the regular divs
divs = driver.find_elements(By.TAG_NAME, "div")
#copy them to help with stale elements
copied_divs = divs
last_title = ""
for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")
parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")
if h2 and parsable:
title = h2.text
if title == last_title:
continue
a = h2.find_element(By.TAG_NAME, "a")
product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")
ad_status = False
if "sspa" in product_url:
ad_status = True
url_array = product_url.split("/")
asin = url_array[5]
price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0
if not has_price:
continue
symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")
pricing_unit = symbol_element.text
price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}"
rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")
price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")
real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price
product = {
"name": asin,
"title": title,
"url": product_url,
"is_ad": ad_status,
"pricing_unit": pricing_unit,
"price": price,
"real_price": real_price,
"rating": rating
}
print(product)
last_title = title
else:
continue
success = True
if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
finally:
driver.quit()
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
if __name__ == "__main__":
PRODUCTS = ["phone"]
MAX_RETRIES = 2
PAGE = 2
for product in PRODUCTS:
search_products(product, page_number=PAGE)
As you can see above, not much has changed at all in our code. Our function now takes a page_number
and inserts it into our url.
Step 3: Storing the Scraped Data
Now that our crawler can choose a page to scrape, it's time to give it the ability to store our data.
In this section, we'll add a couple classes to do just that: ProductData
and DataPipeline
.
ProductData
simply holds information from the objects we scrape.DataPipeline
does the job of filtering out duplicates and safely storing our data.
Here is our updated code example.
from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def search_products(product_name: str, page_number=1, retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
driver.get(url)
logger.info("Successfully fetched page")
#remove the bad divs
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")
for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
#find the regular divs
divs = driver.find_elements(By.TAG_NAME, "div")
#copy them to help with stale elements
copied_divs = divs
last_title = ""
for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")
parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")
if h2 and parsable:
title = h2.text
if title == last_title:
continue
a = h2.find_element(By.TAG_NAME, "a")
product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")
ad_status = False
if "sspa" in product_url:
ad_status = True
url_array = product_url.split("/")
asin = url_array[5]
price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0
if not has_price:
continue
symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")
pricing_unit = symbol_element.text
price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}"
rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")
price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")
real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
finally:
driver.quit()
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
if __name__ == "__main__":
PRODUCTS = ["phone"]
MAX_RETRIES = 2
for product in PRODUCTS:
product_pipeline = DataPipeline(csv_filename=f"{product}.csv")
search_products(product, retries=MAX_RETRIES, data_pipeline=product_pipeline)
product_pipeline.close_pipeline()
In the example above, we add our ProductData
class to hold individual product data. We add a DataPipeline
as well.
Our DataPipeline
does all the heavy lifting of removing duplicates and saving our information to a CSV file.
Step 4: Adding Concurrency
When we added pagination earlier, we gave our crawler the ability to scrape different pages. Now that we can scrape a specific page and store its data, it's time to give our crawler the power to scrape a bunch of pages at once. With concurrency, we can do exactly that.
Here is our threaded_search()
function.
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
We use ThreadPoolExecutor
to manage our threads. This function will use 5 threads by default when performing searches, so we'll have a maximum of 5 searches going simultaneously. Be mindful when choosing how many threads to use. Not only does your machine have limits, but your ScrapeOps API key will likely also have a concurrency limit. You don't want to run threads past your limit... you'd just be wasting resources!
Here is our updated code. We also added a location
argument to search_products()
. While we don't use the location in this example, we'll be using it in the next section when we add proxy support.
from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
driver.get(url)
logger.info("Successfully fetched page")
#remove the bad divs
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")
for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
#find the regular divs
divs = driver.find_elements(By.TAG_NAME, "div")
#copy them to help with stale elements
copied_divs = divs
last_title = ""
for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")
parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")
if h2 and parsable:
title = h2.text
if title == last_title:
continue
a = h2.find_element(By.TAG_NAME, "a")
product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")
ad_status = False
if "sspa" in product_url:
ad_status = True
url_array = product_url.split("/")
asin = url_array[5]
price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0
if not has_price:
continue
symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")
pricing_unit = symbol_element.text
price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}"
rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")
price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")
real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
finally:
driver.quit()
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
if __name__ == "__main__":
PRODUCTS = ["phone"]
MAX_RETRIES = 2
PAGES = 2
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
We're almost ready, but not quite. If you run the code above, you'll still most likely get blocked. To an Amazon server, our scraper already looked a bit abnormal. Now it's not only abnormal, it's exponentially faster than it was before. Let's add proxy support in the next section.
Step 5: Bypassing Anti-Bots
We're almost ready for our production run. It's time to add proxy support so Amazon stops blocking our crawler. We really only need to add one function here, get_scrapeops_url()
.
This function takes in a regular URL and uses basic string formatting to convert it into a URL that uses the ScrapeOps API. Take a look below:
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
This function takes our url and formats it into a proxied url. Here is our updated code below.
from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
driver.get(get_scrapeops_url(url))
logger.info("Successfully fetched page")
#remove the bad divs
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")
for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
#find the regular divs
divs = driver.find_elements(By.TAG_NAME, "div")
#copy them to help with stale elements
copied_divs = divs
last_title = ""
for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")
parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")
if h2 and parsable:
title = h2.text
if title == last_title:
continue
a = h2.find_element(By.TAG_NAME, "a")
product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")
ad_status = False
if "sspa" in product_url:
ad_status = True
url_array = product_url.split("/")
asin = url_array[5]
price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0
if not has_price:
continue
symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")
pricing_unit = symbol_element.text
price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}".replace(",", "")
rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")
price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")
real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "").replace(",", "")
real_price = float(real_price_str)
else:
real_price = price
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
finally:
driver.quit()
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
if __name__ == "__main__":
PRODUCTS = ["phone"]
MAX_RETRIES = 2
PAGES = 2
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
Now that we can get past anti-bots, we're ready to move on to our production run.
Step 6: Production Run
Time for our production run. Take a look at our main function below.
if __name__ == "__main__":
PRODUCTS = ["phone"]
MAX_RETRIES = 4
PAGES = 3
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
You can change any of the following constants to change your results:
PRODUCTS
MAX_RETRIES
PAGES
MAX_THREADS
LOCATION
To run this scraper, replace the filename below with whatever you chose to name yours.
python crawler-proxy.py
Our final crawler generated a report on 3 pages full of phones in 52.5 seconds. When running in production, be cautious of your MAX_THREADS
.
Selenium can be vulernable to both thread locking and "stale elements". If you are noticing stale element errors, decrease your MAX_THREADS
. Each thread is running its own browser and this can get resource intensive.
Here is the report it created:
Build An Amazon Product Scraper
Now it's time to build a scraper that looks up individual products. From these individual product pages, we need to extract feature bullets, prices, and images. This way, if you're interested in a product, simply pull up your report for that product!
Step 1: Create Simple Amazon Product Page Data Parser
Here's a parsing function that retrieves data from a product page. We're not ready to add it into our scraper because we need the ability to read the CSV we created earlier.
def parse_product(product_object, location="us", retries=3):
product_url = product_object["url"]
tries = 0
success = False
url_array = product_url.split("/")
title = url_array[-4]
asin = url_array[-2]
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(product_url)
images_to_save = []
features = []
images = driver.find_elements(By.CSS_SELECTOR, "li img")
for image in images:
image_link = image.get_attribute("src")
if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save:
images_to_save.append(image_link)
feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini")
for feature in feature_bullets:
text = feature.find_element(By.TAG_NAME, "span").text
if text not in features:
features.append(text)
price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text
whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "")
decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text
price = float(f"{whole_number}.{decimal}")
if len(images_to_save) > 0 and len(features) > 0:
item_data = {
"name": asin,
"title": title,
"url": product_url,
"pricing_unit": price_symbol,
"price": price,
"feature_1": features[0] if len(features) > 0 else "n/a",
"feature_2": features[1] if len(features) > 1 else "n/a",
"feature_3": features[2] if len(features) > 2 else "n/a",
"feature_4": features[3] if len(features) > 3 else "n/a",
"images_1": images_to_save[0] if len(images_to_save) > 0 else "n/a",
"images_2": images_to_save[1] if len(images_to_save) > 1 else "n/a",
"images_3": images_to_save[2] if len(images_to_save) > 2 else "n/a",
"images_4": images_to_save[3] if len(images_to_save) > 3 else "n/a"
}
print(item_data)
success = True
except Exception as e:
driver.save_screenshot("PARSE_ERROR.png")
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
finally:
driver.quit()
return None
In the above function, we pull the features and item images from the product page. These will be used in the individual report we generate for each product.
Step 2: Loading URLs To Scrape
Now it's time to give our code the ability to run. In order to parse these items, we need to read them from a CSV file and then pass them into our parse function. The code example below adds a threaded_item_lookup()
function.
At the moment, this function does not use threading. We just have a for
loop as a placeholder.
This function reads the CSV file and then passes each object from the file into parse_product()
.
from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
driver.get(url)
logger.info("Successfully fetched page")
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")
last_title = ""
for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
divs = driver.find_elements(By.TAG_NAME, "div")
copied_divs = divs
last_title = ""
for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")
parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")
if h2 and parsable:
title = h2.text
if title == last_title:
continue
a = h2.find_element(By.TAG_NAME, "a")
product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")
ad_status = False
if "sspa" in product_url:
ad_status = True
url_array = product_url.split("/")
asin = url_array[5]
price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0
if not has_price:
continue
symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")
pricing_unit = symbol_element.text
price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}"
rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")
price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")
real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
finally:
driver.quit()
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
def parse_product(product_object, location="us", retries=3):
product_url = product_object["url"]
tries = 0
success = False
url_array = product_url.split("/")
title = url_array[-4]
asin = url_array[-2]
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(product_url)
images_to_save = []
features = []
images = driver.find_elements(By.CSS_SELECTOR, "li img")
for image in images:
image_link = image.get_attribute("src")
if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save:
images_to_save.append(image_link)
feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini")
for feature in feature_bullets:
text = feature.find_element(By.TAG_NAME, "span").text
if text not in features:
features.append(text)
price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text
whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "")
decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text
price = float(f"{whole_number}.{decimal}")
if len(images_to_save) > 0 and len(features) > 0:
item_data = {
"name": asin,
"title": title,
"url": product_url,
"pricing_unit": price_symbol,
"price": price,
"feature_1": features[0] if len(features) > 0 else "n/a",
"feature_2": features[1] if len(features) > 1 else "n/a",
"feature_3": features[2] if len(features) > 2 else "n/a",
"feature_4": features[3] if len(features) > 3 else "n/a",
"images_1": images_to_save[0] if len(images_to_save) > 0 else "n/a",
"images_2": images_to_save[1] if len(images_to_save) > 1 else "n/a",
"images_3": images_to_save[2] if len(images_to_save) > 2 else "n/a",
"images_4": images_to_save[3] if len(images_to_save) > 3 else "n/a"
}
print(item_data)
success = True
except Exception as e:
driver.save_screenshot("PARSE_ERROR.png")
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
finally:
driver.quit()
return None
def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))
for row in reader:
parse_product(row)
if __name__ == "__main__":
PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 1
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)
for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
You might be wondering why we use a separate field for each bullet point and image. The reason for this is actually pretty simple: our csv module has a very difficult time managing arrays and once we convert this item into a @dataclass
, our fields won't be able to hold mutable size. In Python, arrays are mutable by default.
Step 3: Storing the Scraped Data
Similar to how we stored our data with the results crawler, we're going to be using a custom class to hold our data. We'll then pass this object into our DataPipeline
to both filter and store our data.
The code below adds a ProductPageData
class and passes it into our new pipeline for safe storage.
from selenium import webdriver
from selenium.webdriver import ChromeOptions
from selenium.webdriver.common.by import By
import logging, os
import json, csv
from dataclasses import dataclass, field, fields, asdict
from urllib.parse import urlencode
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
OPTIONS = ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
@dataclass
class ProductData:
name: str = ""
title: str = ""
url: str = "",
is_ad: bool = False,
pricing_unit: str = "",
price: float = None,
real_price: float = None,
rating: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ProductPageData:
name: str = ""
title: str = ""
url: str = "",
pricing_unit: str = "",
price: float = None,
feature_1: str = "",
feature_2: str = "",
feature_3: str = "",
feature_4: str = "",
images_1: str = "",
images_2: str = "",
images_3: str = "",
images_4: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename='', storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def search_products(product_name: str, page_number=1, location="us", retries=3, data_pipeline=None):
tries = 0
success = False
while tries < retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
url = f"https://www.amazon.com/s?k={product_name}&page={page_number}"
driver.get(url)
logger.info("Successfully fetched page")
bad_divs = driver.find_elements(By.CSS_SELECTOR, "div.AdHolder")
last_title = ""
for bad_div in bad_divs:
driver.execute_script("""
var element = arguments[0];
element.parentNode.removeChild(element);
""", bad_div)
divs = driver.find_elements(By.TAG_NAME, "div")
copied_divs = divs
last_title = ""
for div in copied_divs:
h2s = div.find_elements(By.TAG_NAME, "h2")
parsable = len(h2s) > 0
if parsable:
h2 = div.find_element(By.TAG_NAME, "h2")
if h2 and parsable:
title = h2.text
if title == last_title:
continue
a = h2.find_element(By.TAG_NAME, "a")
product_url = (a.get_attribute("href") if a else "").replace("proxy.scrapeops.io", "www.amazon.com")
ad_status = False
if "sspa" in product_url:
ad_status = True
url_array = product_url.split("/")
asin = url_array[5]
price_symbols_array = div.find_elements(By.CSS_SELECTOR, "span.a-price-symbol")
has_price = len(price_symbols_array) > 0
if not has_price:
continue
symbol_element = div.find_element(By.CSS_SELECTOR, "span.a-price-symbol")
pricing_unit = symbol_element.text
price_whole = div.find_element(By.CSS_SELECTOR, "span.a-price-whole")
price_decimal = div.find_element(By.CSS_SELECTOR, "span.a-price-fraction")
price_str = f"{price_whole.text}.{price_decimal.text}"
rating_element = div.find_element(By.CLASS_NAME, "a-icon-alt")
rating = rating_element.get_attribute("innerHTML")
price = float(price_str)
real_price_array = div.find_elements(By.CSS_SELECTOR, "span.a-price.a-text-price")
real_price = 0.0
if len(real_price_array) > 0:
real_price_str = real_price_array[0].text.replace(pricing_unit, "")
real_price = float(real_price_str)
else:
real_price = price
product = ProductData(
name=asin,
title=title,
url=product_url,
is_ad=ad_status,
pricing_unit=pricing_unit,
price=price,
real_price=real_price,
rating=rating
)
data_pipeline.add_data(product)
last_title = title
else:
continue
success = True
if not success:
raise Exception(f"Failed to scrape the page {page_number}, tries left: {retries-tries}")
except Exception as e:
logger.warning(f"Failed to scrape page, {e}")
tries += 1
finally:
driver.quit()
if not success:
logger.warning(f"Failed to scrape page, retries exceeded: {retries}")
def threaded_search(product_name, pages, max_workers=5, location="us", retries=3):
search_pipeline = DataPipeline(csv_filename=f"{product_name}.csv")
pages = list(range(1, pages+1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(
search_products,
[product_name] * len(pages),
pages,
[location] * len(pages),
[retries] * len(pages),
[search_pipeline] * len(pages)
)
search_pipeline.close_pipeline()
def parse_product(product_object, location="us", retries=3):
product_url = product_object["url"]
tries = 0
success = False
url_array = product_url.split("/")
title = url_array[-4]
print(title)
product_pipeline = DataPipeline(csv_filename=f"{title}.csv")
asin = url_array[-2]
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(product_url)
images_to_save = []
features = []
images = driver.find_elements(By.CSS_SELECTOR, "li img")
for image in images:
image_link = image.get_attribute("src")
if "https://m.media-amazon.com/images/I/" in image_link not in images_to_save:
images_to_save.append(image_link)
feature_bullets = driver.find_elements(By.CSS_SELECTOR, "li.a-spacing-mini")
for feature in feature_bullets:
text = feature.find_element(By.TAG_NAME, "span").text
if text not in features:
features.append(text)
price_symbol = driver.find_element(By.CSS_SELECTOR, "span.a-price-symbol").text
whole_number = driver.find_element(By.CSS_SELECTOR, "span.a-price-whole").text.replace(",", "").replace(".", "")
decimal = driver.find_element(By.CSS_SELECTOR, "span.a-price-fraction").text
price = float(f"{whole_number}.{decimal}")
if len(images_to_save) > 0 and len(features) > 0:
item_data = ProductPageData(
name=asin,
title=title,
url=product_url,
pricing_unit=price_symbol,
price=price,
feature_1=features[0] if len(features) > 0 else "n/a",
feature_2=features[1] if len(features) > 1 else "n/a",
feature_3=features[2] if len(features) > 2 else "n/a",
feature_4=features[3] if len(features) > 3 else "n/a",
images_1=images_to_save[0] if len(images_to_save) > 0 else "n/a",
images_2=images_to_save[1] if len(images_to_save) > 1 else "n/a",
images_3=images_to_save[2] if len(images_to_save) > 2 else "n/a",
images_4=images_to_save[3] if len(images_to_save) > 3 else "n/a"
)
product_pipeline.add_data(item_data)
product_pipeline.close_pipeline()
success = True
except Exception as e:
driver.save_screenshot("PARSE_ERROR.png")
logger.warning(f"Failed to parse item: {e}, tries left: {retries-tries}")
tries += 1
finally:
driver.quit()
return None
def threaded_item_lookup(csv_filename, location="us", retries=3, threads=3):
with open(csv_filename) as csvfile:
reader = list(csv.DictReader(csvfile))
for row in reader:
parse_product(row)
if __name__ == "__main__":
PRODUCTS = ["phone"]
AGGREGATE_PRODUCTS = []
MAX_RETRIES = 2
PAGES = 1
MAX_THREADS = 3
LOCATION = "us"
for product in PRODUCTS:
threaded_search(product, PAGES, max_workers=MAX_THREADS, retries=MAX_RETRIES, location=LOCATION)
filename = f"{product}.csv"
AGGREGATE_PRODUCTS.append(filename)
for product in AGGREGATE_PRODUCTS:
threaded_item_lookup(product, location=LOCATION, threads=MAX_THREADS, retries=MAX_RETRIES)
You may have also notice that from inside parse_product()
, we open up an individual pipeline for each product.
This way, we generate an individual report for each one of the products we scraped earlier with the crawler. If you want to see details about a specific item, you can just open the report for that item!!!
Step 4: Adding Concurrency
Now, we're going to add concurrency so we can parse multiple products at once. This is very similar to when we added concurrency to the crawler earlier.
Take a look at the function below, it's the finished version of threaded_item_lookup()
:
def threaded_item_lookup(csv_filename, location="us", retries=3