How to Scrape Etsy With Requests and BeautifulSoup
Etsy is home to a seemingly endless stream of different products created by small businesses. You would think that Etsy would want to be scraped in order to get more exposure for their products, however, that is not the case. Etsy employs some of the most difficult anti-bots we've encountered in this series.
Today, we're going to scrape coffee mugs from Etsy, but this project applies to just about anything you'd want to scrape from Etsy.
- TLDR: How to Scrape Etsy
- How To Architect Our Scraper
- Understanding How To Scrape Etsy
- Setting Up Our Etsy Scraper
- Build A Etsy Search Crawler
- Build A Etsy Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Etsy
Need to scrape Etsy? Don't have time to read?
We've got you covered.
The program below takes a keyword and performs a crawl. Then, it scrapes reviews for each item it found during the crawl.
To use it:
- Create a new project folder.
- After you've made a new folder, make a
config.json
file. - Inside the config file, add your API key:
{"api_key": "your-super-secret-api-key"}
. - Once you've done that, copy and paste the code below into a new Python file.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"bypass": "generic_level_4",
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
price_currency: str = ""
listing_id: int = 0
current_price: float = 0.0
original_price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
review: str = ""
stars: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="wt-height-full")
result_count = 0
last_listing = ""
for div_card in div_cards:
title = div_card.find("h3")
if not title:
continue
name = title.get("title")
a_tag = div_card.find("a")
listing_id = a_tag.get("data-listing-id")
if listing_id == last_listing:
continue
link = a_tag.get("href")
stars = 0.0
has_stars = div_card.find("span", class_="wt-text-title-small")
if has_stars:
stars = float(has_stars.text)
currency = "n/a"
currency_holder = div_card.find("span", class_="currency-symbol")
if currency_holder:
currency = currency_holder.text
prices = div_card.find_all("span", class_="currency-value")
if len(prices) < 1:
continue
current_price = prices[0].text
original_price = current_price
if len(prices) > 1:
original_price = prices[1].text
search_data = SearchData(
name=name,
stars=stars,
url=link,
price_currency=currency,
listing_id=listing_id,
current_price=current_price,
original_price=original_price
)
data_pipeline.add_data(search_data)
result_count+=1
last_listing = listing_id
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = []
for review_rank in range(4):
card = soup.select_one(f"div[id='review-text-width-{review_rank}']")
if card:
review_cards.append(card)
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv")
for review_card in review_cards:
rating = review_card.select_one("input[name='rating']").get("value")
review = review_card.find("p").text.strip()
name_date_holder = review_card.find("a", class_="wt-text-link wt-mr-xs-1")
if not name_date_holder:
continue
name = name_date_holder.get("aria-label").replace("Reviewer ", "")
if not name:
name = "n/a"
date = name_date_holder.parent.text.strip().replace(name, "")
if date == "":
continue
review_data = ReviewData(
name=name,
date=date,
review=review,
stars=rating
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
For customization, feel free to change any of the following:
keyword_list
: Contains a list of keywords to be searched and scraped.MAX_RETRIES
: Specifies the number of times the scraper will retry fetching a page if it encounters an error.MAX_THREADS
: Defines the maximum number of threads to be used for concurrent scraping.PAGES
: Specifies the number of pages to scrape for each keyword.LOCATION
: Defines the geographic location from which the scraping requests appear to originate.
WARNING: This code uses the bypass
setting of generic_level_4
. It costs 85 API credits per call. This configuration costs significantly more than standard requests to the ScrapeOps API.
How To Architect Our Etsy Scraper
As mentioned earlier, Etsy uses some incredibly strong anti-bots they nest their data in a very messy way.
To handle these situations, we'll use some simple techniques with BeautifulSoup and we'll use the ScrapeOps Proxy Aggregator's bypass
method in order to get past the anti-bots. The bypass
parameter costs extra API credits and this is an expensive scrape to run.
Conceptually, this is pretty similar to other scraping projects we've made in this series. We need a both a search crawler and a review scraper.
We'll build the search crawler in the following steps:
- Parsing Etsy search results.
- Controlling our search results through pagination.
- Storing our extracted search results.
- Parsing pages and storing data with concurrency.
- Bypassing anti-bots with proxy integration.
Then, we're going to build a review scraper following these steps:
- Parsing product reviews.
- Reading the crawler report so we can parse reviews for each product.
- Adding data storage for our extracted reviews.
- Parsing reviews and storing data concurrently.
- Using proxy integration to once again bypass anti-bots.
Understanding How To Scrape Etsy
Now, we need to get a better understanding of how to get Etsy pages and how to extract their data. In the coming sections we'll look at:
- How To Request Etsy Pages
- How To Extract Data From Etsy
- How To Control Pagination
- How To Control Our Geolocation
Step 1: How To Request Etsy Pages
As with any website, we always begin with a simple GET request.
- When you visit a website, you're performing a GET request with your browser.
- Your browser receives an HTML page as a response.
- The browser then reads this HTML and renders the webpage for you to view.
- With a simple HTTP client (like Python Requests), nothing renders the page.
With Python Requests, we need to code our scraper to read the page and pull the data.
Take a look at the screenshot below. This is an Etsy search results page. As you can see, the URL is:
https://www.etsy.com/search?q=coffee+mug&ref=pagination&page=2
We're going to ignore the pagination at the moment and just focus on the rest of the URL:
https://www.etsy.com/search?q=coffee+mug&ref=pagination
q=coffee+mug
represents our query. q
is for "query" and coffee+mug
represents the value, "coffee mug".
Our reconstructed URLs will look like this:
https://www.etsy.com/search?q={formatted_keyword}&ref=pagination
We'll be extracting our product urls during the crawl. The product pages look like the one below. If you scroll down far enough, you'll find the product reviews.
Step 2: How To Extract Data From Etsy Results and Pages
Extracting data from the search pages can be pretty difficult. Each product is held inside a div
with the class of wt-height-full
. This is difficult because there are other non product items on the page embedded within div
elements of this same kind.
When we build our crawler, we'll need to add some code to filter these elements out of our search. Take a look at the page below, it shows a result item like this highlighted using inspect.
Extracting our reviews is somewhat easier. Reviews are a little more uniform.
- The highlighted review is a
div
with anid
ofreview-text-width-0
. - However, if you inspect these reviews on the page, the number at the end (0), increments with each review.
- This first review is
review-text-width-0
. - Our second review is
review-text-width-1
. It increments continually with each review.
Step 3: How To Control Pagination
Pagination is really simple to control. When we examined our URL earlier, we omitted the page
parameter. This one is pretty simple. To control our page, we simply add page={page_number+1}
to our URL. We use page_number+1
because Python's range()
begins counting at 0 but our page numbers begin at 1.
Our fully formatted URL with pagination would look like this:
https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}
Note: ref=pagination
might look like it's relevant to our pagination, however it is not. ref
is short for referrer or referral. ref=pagination
tells Etsy that we were referred to the page via their pagination system. Leaving parameter in makes us look less like a bot.
A normal person is going to visit page 2 by clicking the page 2 button, which gives us a referral to the page using the pagination.
Step 4: Geolocated Data
Geolocated data is pretty important. We don't want to get inconsistent results. If we don't control our pagination, we could be getting results in dollars on one page, in pounds on another and in euros on another.
For geolocation, we'll be using the country
parameter with the ScrapeOps API.
- When we talk to ScrapeOps, we can pass
"country": "us"
if we want to appear in the US. - If we want to appear in the UK, we can pass
"country": "uk"
.
You can view our full list of supported countries here.
Setting Up Our Etsy Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir etsy-scraper
cd etsy-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build An Etsy Search Crawler
It's finally time to start building! Our crawler needs to perform a keyword search and save the results in a CSV file. It should be able to search multiple pages concurrently and it should also utilize proxy integration to bypass Etsy's anti-bot system. We'll build our features in the following steps:
- Parsing
- Pagination
- Data Storage
- Concurrency
- Proxy Integration
Step 1: Create Simple Search Data Parser
Let's get started with our parser. In the code example below, we'll write a basic script with error handling, retries, and a parsing function.
While the other portions of the script are important, you should pay close attention to our parsing function... this is where the magic happens.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="wt-height-full")
result_count = 0
last_listing = ""
for div_card in div_cards:
title = div_card.find("h3")
if not title:
continue
name = title.get("title")
a_tag = div_card.find("a")
listing_id = a_tag.get("data-listing-id")
if listing_id == last_listing:
continue
link = a_tag.get("href")
stars = 0.0
has_stars = div_card.find("span", class_="wt-text-title-small")
if has_stars:
stars = float(has_stars.text)
currency = "n/a"
currency_holder = div_card.find("span", class_="currency-symbol")
if currency_holder:
currency = currency_holder.text
prices = div_card.find_all("span", class_="currency-value")
if len(prices) < 1:
continue
current_price = prices[0].text
original_price = current_price
if len(prices) > 1:
original_price = prices[1].text
search_data = {
"name": name,
"stars": stars,
"url": link,
"price_currency": currency,
"listing_id": listing_id,
"current_price": current_price,
"original_price": original_price
}
print(search_data)
result_count+=1
last_listing = listing_id
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
scrape_search_results()
does the following when we extract the data:
- Finds all of our
div
elements:soup.find_all("div", class_="wt-height-full")
. - We find the
title
withdiv_card.find("h3")
. If thetitle
doesn't exist, we skip thisdiv
. div_card.find("a")
finds oura_tag
. From here, we can extract ourlink
andlisting_id
.div_card.find("span", class_="wt-text-title-small")
finds whether or not we have stars present. If there are stars present, we assign them to ourrating
. If there are no stars present, we assign a default rating of 0.0.div_card.find("span", class_="currency-symbol")
finds ourcurrency_holder
. If acurrency_holder
is present, we save the currency symbol.- We then extract the
current_price
andoriginal_price
so we can see if an item is on sale or not.
Step 2: Add Pagination
As mentioned earlier, pagination is controlled with a simple parameter, page
. We also need a function that allows us to crawl multiple pages. Take a look at the snippet below, start_scrape()
.
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
When we put all of it together, we get a script that looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="wt-height-full")
result_count = 0
last_listing = ""
for div_card in div_cards:
title = div_card.find("h3")
if not title:
continue
name = title.get("title")
a_tag = div_card.find("a")
listing_id = a_tag.get("data-listing-id")
if listing_id == last_listing:
continue
link = a_tag.get("href")
stars = 0.0
has_stars = div_card.find("span", class_="wt-text-title-small")
if has_stars:
stars = float(has_stars.text)
currency = "n/a"
currency_holder = div_card.find("span", class_="currency-symbol")
if currency_holder:
currency = currency_holder.text
prices = div_card.find_all("span", class_="currency-value")
if len(prices) < 1:
continue
current_price = prices[0].text
original_price = current_price
if len(prices) > 1:
original_price = prices[1].text
search_data = {
"name": name,
"stars": stars,
"url": link,
"price_currency": currency,
"listing_id": listing_id,
"current_price": current_price,
"original_price": original_price
}
print(search_data)
result_count+=1
last_listing = listing_id
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
- Our url now contains a parameter for our pagination,
https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}
start_scrape()
allows us to crawl a list of pages instead of just the first page.
Step 3: Storing the Scraped Data
Storing our data is going to take a little OOP (Object Oriented Programming). We need a dataclass
to represent our search objects. We also need a DataPipeline
to store these dataclass
objects inside a CSV file. Our dataclass
will be called SearchData
.
Here is our SearchData
. It holds all of the information we've been extracting in our previous two iterations.
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
price_currency: str = ""
listing_id: int = 0
current_price: float = 0.0
original_price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is the DataPipeline
. It takes in a dataclass
and stores it to a CSV file. It also filters out duplicates using the name
attribute.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
In our full code example below, we open a DataPipeline
inside of our main
. The DataPipeline
gets passed into start_scrape()
which in turn passes it to scrape_search_results()
.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
price_currency: str = ""
listing_id: int = 0
current_price: float = 0.0
original_price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="wt-height-full")
result_count = 0
last_listing = ""
for div_card in div_cards:
title = div_card.find("h3")
if not title:
continue
name = title.get("title")
a_tag = div_card.find("a")
listing_id = a_tag.get("data-listing-id")
if listing_id == last_listing:
continue
link = a_tag.get("href")
stars = 0.0
has_stars = div_card.find("span", class_="wt-text-title-small")
if has_stars:
stars = float(has_stars.text)
currency = "n/a"
currency_holder = div_card.find("span", class_="currency-symbol")
if currency_holder:
currency = currency_holder.text
prices = div_card.find_all("span", class_="currency-value")
if len(prices) < 1:
continue
current_price = prices[0].text
original_price = current_price
if len(prices) > 1:
original_price = prices[1].text
search_data = SearchData(
name=name,
stars=stars,
url=link,
price_currency=currency,
listing_id=listing_id,
current_price=current_price,
original_price=original_price
)
data_pipeline.add_data(search_data)
result_count+=1
last_listing = listing_id
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
SearchData
is used to represent individual search result objects on the page.DataPipeline
opens a pipe to a CSV file. It is then used to saveSearchData
objects to the CSV.
Step 4: Adding Concurrency
When working with concurrency, multithreading comes in very handy. To crawl multiple pages concurrently, we'll use ThreadPoolExecutor
to run our parsing function on multiple threads simultaneously.
To accomplish this, we're going to replace the for
loop in start_scrape()
with ThreadPoolExecutor
.
Here is our rewritten start_scrape()
function.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
scrape_search_results
is the function you wish to call on each thread.- All other arguments get passed in as lists.
executor.map()
takes each item from each list and passes it intoscrape_search_results
.
Here is our full code.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
price_currency: str = ""
listing_id: int = 0
current_price: float = 0.0
original_price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="wt-height-full")
result_count = 0
last_listing = ""
for div_card in div_cards:
title = div_card.find("h3")
if not title:
continue
name = title.get("title")
a_tag = div_card.find("a")
listing_id = a_tag.get("data-listing-id")
if listing_id == last_listing:
continue
link = a_tag.get("href")
stars = 0.0
has_stars = div_card.find("span", class_="wt-text-title-small")
if has_stars:
stars = float(has_stars.text)
currency = "n/a"
currency_holder = div_card.find("span", class_="currency-symbol")
if currency_holder:
currency = currency_holder.text
prices = div_card.find_all("span", class_="currency-value")
if len(prices) < 1:
continue
current_price = prices[0].text
original_price = current_price
if len(prices) > 1:
original_price = prices[1].text
search_data = SearchData(
name=name,
stars=stars,
url=link,
price_currency=currency,
listing_id=listing_id,
current_price=current_price,
original_price=original_price
)
data_pipeline.add_data(search_data)
result_count+=1
last_listing = listing_id
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
start_scrape()
now runs our parsing function on multiple pages concurrently.ThreadPoolExecutor
gives us the ability to run any function on multiple threads.
Step 5: Bypassing Anti-Bots
Now, it's time to write our proxy function. When we're dealing with Etsy, we need to bypass their anti-bots with a lot more strength than a traditional proxy connection. This is more expensive. Take a look at our typical proxy function below.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
We're going to add the bypass
argument to this function. There are a whole slew of different values we can pass in here. generic_level_4
is the strongest and it costs 85 API credits per use. This makes our proxy connection 85 times more expensive than a standard proxy with ScrapeOps!
You can view the other bypass
options here.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"bypass": "generic_level_4",
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
With our proxy connection written, we are now ready for production! Take a look below and see our finalized crawler.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"bypass": "generic_level_4",
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
price_currency: str = ""
listing_id: int = 0
current_price: float = 0.0
original_price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="wt-height-full")
result_count = 0
last_listing = ""
for div_card in div_cards:
title = div_card.find("h3")
if not title:
continue
name = title.get("title")
a_tag = div_card.find("a")
listing_id = a_tag.get("data-listing-id")
if listing_id == last_listing:
continue
link = a_tag.get("href")
stars = 0.0
has_stars = div_card.find("span", class_="wt-text-title-small")
if has_stars:
stars = float(has_stars.text)
currency = "n/a"
currency_holder = div_card.find("span", class_="currency-symbol")
if currency_holder:
currency = currency_holder.text
prices = div_card.find_all("span", class_="currency-value")
if len(prices) < 1:
continue
current_price = prices[0].text
original_price = current_price
if len(prices) > 1:
original_price = prices[1].text
search_data = SearchData(
name=name,
stars=stars,
url=link,
price_currency=currency,
listing_id=listing_id,
current_price=current_price,
original_price=original_price
)
data_pipeline.add_data(search_data)
result_count+=1
last_listing = listing_id
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Now that our crawler is ready to go, it's time to test it out in production! For the sake of saving API credits, we'll only crawl one page. You can change this to more if you'd like.
Feel free to change any of the following in the main
:
keyword_list
: Contains a list of keywords to be searched and scraped.MAX_RETRIES
: Specifies the number of times the scraper will retry fetching a page if it encounters an error.MAX_THREADS
: Defines the maximum number of threads to be used for concurrent scraping.PAGES
: Specifies the number of pages to scrape for each keyword.LOCATION
: Defines the geographic location from which the scraping requests appear to originate.
Here is our main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
You can view our results below.
It took 28.802 seconds to crawl a single page. This is primarily because of the anti-bot bypass.
Build An Etsy Scraper
Now that we're effectively crawling Etsy and extracting items, we need to scrape their reviews. This allows us to collect data on the general consumer sentiment toward each item. We'll build in the following steps:
- Building a review parser.
- Reading the CSV file.
- Adding data storage.
- Adding concurrency.
- Integrating with a proxy.