How to Scrape Etsy With Selenium
Etsy has tons of different products from small businesses. With Etsy, you can search an item and find seemingly endless products that match your search... almost all created by small business owners.
Scraping Etsy gives us access to a massive dataset of consumer products. Most of these products come from different sellers. When we scrape a large set of these products, we can build a pretty decent picture of the consumer sentiment around those products.
Today, we'll be scraping coffee mugs, but the concepts learned here can be applied to scrape pretty much anything you want from Etsy.
- TLDR: How to Scrape Etsy
- How To Architect Our Scraper
- Understanding How To Scrape Etsy
- Setting Up Our Etsy Scraper
- Build A Etsy Search Crawler
- Build A Etsy Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Etsy
If you don't have time to read, we've got you covered.
The Python script below performs a crawl based on a keyword search. After the crawl, it goes through and scrapes reviews for each item found in the crawl.
To use this script:
- Create a new project folder.
- After you've made your new folder, make a
config.json
file. - Inside the config file, add your API key:
{"api_key": "your-super-secret-api-key"}
. - Once you've done that, copy and paste the code below into a new Python file.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from time import sleep
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"bypass": "generic_level_4",
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
listing_id: int = 0
price_currency: str = ""
price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
review: str = ""
stars: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36")
prefs = {
"profile.managed_default_content_settings.javascript": 2,
"profile.managed_default_content_settings.stylesheets": 2
}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Successfully pinged {url}")
content = driver.page_source
script_tag_begin_index = content.find('"itemListElement"')
script_tag_end_index = content.find('"numberOfItems"')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}"
json_data = json.loads(json_string)
list_elements = json_data["itemListElement"]
for element in list_elements:
name = element["name"]
link = element["url"]
listing_id = link.split("/")[-2]
currency = element["offers"]["priceCurrency"]
price = element["offers"]["price"]
search_data = SearchData(
name=name,
url=link,
listing_id=listing_id,
price_currency=currency,
price=float(price)
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
print("getting", url)
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
driver = webdriver.Chrome(options=options)
driver.get((get_scrapeops_url(url, location=location)))
logger.info(f"successfully pinged: {url}")
try:
content = driver.page_source
script_tag_begin_index = content.find('"review":')
script_tag_end_index = content.find('}}]')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index] + "}}]}"
json_data = json.loads(json_string)
list_elements = json_data["review"]
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv")
for element in list_elements:
review_data = ReviewData(
name=element["author"]["name"],
date=element["datePublished"],
review=element["reviewBody"],
stars=element["reviewRating"]["ratingValue"]
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
To customize your results, feel free to change any of the following:
MAX_RETRIES
: Controls the number of retry attempts in case of an error.MAX_THREADS
: Defines the number of threads to use during concurrent scraping.PAGES
: The number of pages to scrape for each keyword.LOCATION
: Controls the location (country) to simulate the user browsing from a specific region.keyword_list
: A list of keywords you want to scrape.
WARNING: This code uses the bypass
setting of generic_level_4
. It costs 85 API credits per call. This configuration costs significantly more than standard requests to the ScrapeOps API.
How To Architect Our Etsy Scraper
Etsy's anti-bot protections are amongst the best of the best. To get past these anti-bots, we'll be using the ScrapeOps Proxy Aggregator's bypass
argument. This feature will get us through even the toughest of anti-bots. The bypass
parameter costs extra API credits and this is an expensive scrape to run.
At a high level, this is going to be pretty similar to other scraping projects from this series. We need to build both a search crawler and a review scraper.
We'll build the search crawler in the following steps:
- Parsing Etsy search results.
- Controlling our search results through pagination.
- Storing our extracted search results.
- Parsing pages and storing data with concurrency.
- Bypassing anti-bots with proxy integration.
Then, we're going to build a review scraper following these steps:
- Parsing product reviews.
- Reading the crawler report so we can parse reviews for each product.
- Adding data storage for our extracted reviews.
- Parsing reviews and storing data concurrently.
- Using proxy integration to once again bypass anti-bots.
Understanding How To Scrape Etsy
Before diving head first into code, we need to plan out how we're going to build our scraper. We need to plan out all of the following before we start coding.
- How To Request Etsy Pages
- How To Extract Data From Etsy
- How To Control Pagination
- How To Control Our Geolocation
Step 1: How To Request Etsy Pages
Just like any other site, we start with a simple GET request.
- Whenever you visit a site, your browser performs a GET request.
- Your browser then receives an HTML page as a response.
- The browser reads this HTML and renders the webpage for you to view.
When we access Etsy with Selenium, under the hood, we'll also be performing a GET request. With Selenium, we can do this with driver.get()
.
Take a look at the screenshot below. This shot contains an Etsy search results page. Take a look at the URL:
https://www.etsy.com/search?q=coffee+mug&ref=pagination&page=2
We're going to ignore the pagination at the moment and just focus on the rest of the URL:
https://www.etsy.com/search?q=coffee+mug&ref=pagination
q=coffee+mug
represents our search query.q
is for "query" andcoffee+mug
represents the value, "coffee mug".
Our reconstructed urls will look like this:
https://www.etsy.com/search?q={formatted_keyword}&ref=pagination
During our crawl, we'll find all of our product URLs. If you scroll down a product page far enough, you'll see the reviews. From the product page, these are what we want to extract.
Step 2: How To Extract Data From Etsy Results and Pages
Extracting data from Etsy via Selenium can be a pretty painful process. If you gain access to Etsy via Selenium, your page will actually look something like this.
As you can see, the page doesn't even render properly. All we have is one massive string.
Embedded within this massive string lies some JSON that is normally used to render the page. You can see an example of that JSON below as its laid out in a normal page.
To extract this JSON from Selenium's broken version of the webpage (laid out like the one you see above), we're actually going to manually dig through this giant string and pull out the JSON.
Extracting our reviews won't be any easier. With Selenium, we'll get the same type of corrupted page as a response. We'll filter through the text until we find the beginning and end of our JSON, and then we'll handle the JSON form there.
Step 3: How To Control Pagination
Pagination is one of the easiest parts of this whole scraping job. When we looked at our URL, we ignored the page
parameter. This one is pretty simple.
If we add page={page_number+1}
to our URL, we can control the page number of our results. We use page_number+1
because Python's range()
begins counting at 0 but our page numbers begin at 1.
With support for pagination added, our URLs look like this:
https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}
Note: ref=pagination
might look like it's relevant to our pagination, however it is not. ref
is short for referrer or referral.
ref=pagination
tells Etsy that we were referred to the page via their pagination system. This parameter makes us look less like a bot.
A normal person is going to visit page 2 by clicking the page 2 button, which gives us a referral to the page using the pagination.
Step 4: Geolocated Data
Geolocated data is pretty important. We need it to get consistent results. Without proper geolocation, we could be getting results in dollars on one page, in pounds on another and in euros on another.
For geolocation, we'll be using the country
parameter with the ScrapeOps API.
- When talking to the ScrapeOps API, we can pass
"country": "us"
if we want to appear in the US. - If we want to appear in the UK, we can pass
"country": "uk"
.
You can view our full list of supported countries here.
Setting Up Our Etsy Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir etsy-scraper
cd etsy-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install selenium
Check to make sure you've got the latest version of Chromedriver installed. You can check this page.
Build An Etsy Search Crawler
Time to build our Etsy crawler! This crawler needs to perform a search. Then, it needs to parse these results and save all of our relevant data to a CSV file. Once optimized, this will also utilize concurrency for speed and proxy integration to get past Etsy's anti-bots.
- Parsing
- Pagination
- Data Storage
- Concurrency
- Proxy Integration
Step 1: Create Simple Search Data Parser
Time to build our parser. In this version of our script, we're going to add our basic structure with error handling, retry logic and our a parsing function.
Everything in the code is important, but you really need to pay attention to the parsing logic.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from time import sleep
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36")
prefs = {
"profile.managed_default_content_settings.javascript": 2,
"profile.managed_default_content_settings.stylesheets": 2
}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
logger.info(f"Successfully pinged {url}")
content = driver.page_source
script_tag_begin_index = content.find('"itemListElement"')
script_tag_end_index = content.find('"numberOfItems"')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}"
json_data = json.loads(json_string)
list_elements = json_data["itemListElement"]
for element in list_elements:
print(element)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
scrape_search_results()
does the following when we extract the data:
content.find('"itemListElement"')
gets the beginning of our item list.content.find('"numberOfItems"')
finds the end of our item list.- After we've found the beginning and end of the JSON we want, we add some characters to it so we can format it as proper JSON.
- Then, we print each item from the list.
Step 2: Add Pagination
As mentioned earlier, pagination is controlled with a simple parameter, page
. We also need a function that allows us to crawl multiple pages.
Take a look at the snippet below, start_scrape()
.
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
After putting it together, we get a script that looks like this.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from time import sleep
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36")
prefs = {
"profile.managed_default_content_settings.javascript": 2,
"profile.managed_default_content_settings.stylesheets": 2
}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
logger.info(f"Successfully pinged {url}")
content = driver.page_source
script_tag_begin_index = content.find('"itemListElement"')
script_tag_end_index = content.find('"numberOfItems"')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}"
json_data = json.loads(json_string)
list_elements = json_data["itemListElement"]
for element in list_elements:
print(element)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
- Our url now contains a parameter for our pagination,
https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}
start_scrape()
lets us call our parsing function on an entire list of pages.
Step 3: Storing the Scraped Data
We need to store our data. That's the whole purpose of our scrape. When we extract our data, we save it to a file that people can review later.
In this case, we're also going to write a scraper that reads this file. We'll use CSV as our storage format, this allows both people and programs to read the file easily.
Here is our SearchData
. It holds all of the information we've been extracting in our previous two iterations.
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
price_currency: str = ""
listing_id: int = 0
current_price: float = 0.0
original_price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is the DataPipeline
. It takes in a dataclass
and stores it to a CSV file. It also filters out duplicates using the name
attribute.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
In our full code example below, we open a DataPipeline
inside of our main
. The DataPipeline
gets passed into start_scrape()
which in turn passes it to scrape_search_results()
.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from time import sleep
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
listing_id: int = 0
price_currency: str = ""
price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36")
prefs = {
"profile.managed_default_content_settings.javascript": 2,
"profile.managed_default_content_settings.stylesheets": 2
}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
logger.info(f"Successfully pinged {url}")
content = driver.page_source
script_tag_begin_index = content.find('"itemListElement"')
script_tag_end_index = content.find('"numberOfItems"')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}"
json_data = json.loads(json_string)
list_elements = json_data["itemListElement"]
for element in list_elements:
name = element["name"]
link = element["url"]
listing_id = link.split("/")[-2]
currency = element["offers"]["priceCurrency"]
price = element["offers"]["price"]
search_data = SearchData(
name=name,
url=link,
listing_id=listing_id,
price_currency=currency,
price=float(price)
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
SearchData
represents individual search result objects on the page.DataPipeline
opens a pipe to our CSV file. With the pipeline open, we can saveSearchData
objects to the CSV.
Step 4: Adding Concurrency
When working with concurrency, multithreading is one of the best programming concepts around. To crawl multiple pages concurrently, we'll use ThreadPoolExecutor
to run our parsing function on multiple threads at the same time.
To do this, we'll replace the for
loop in start_scrape()
with ThreadPoolExecutor
.
Here is our rewritten start_scrape()
function.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
scrape_search_results
is the function you wish to call on each thread.- All other arguments get passed in as lists.
executor.map()
takes each item from each list and passes it intoscrape_search_results
.
Here is our full code adjusted for concurrency.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from time import sleep
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
listing_id: int = 0
price_currency: str = ""
price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36")
prefs = {
"profile.managed_default_content_settings.javascript": 2,
"profile.managed_default_content_settings.stylesheets": 2
}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
logger.info(f"Successfully pinged {url}")
content = driver.page_source
script_tag_begin_index = content.find('"itemListElement"')
script_tag_end_index = content.find('"numberOfItems"')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}"
json_data = json.loads(json_string)
list_elements = json_data["itemListElement"]
for element in list_elements:
name = element["name"]
link = element["url"]
listing_id = link.split("/")[-2]
currency = element["offers"]["priceCurrency"]
price = element["offers"]["price"]
search_data = SearchData(
name=name,
url=link,
listing_id=listing_id,
price_currency=currency,
price=float(price)
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- With
start_scrape()
, our parsing function now runs on multiple pages concurrently. ThreadPoolExecutor
gives us the ability to run any function on multiple threads.
Step 5: Bypassing Anti-Bots
Time to write our proxy function. When dealing with Etsy, we need to bypass their anti-bots with a lot more strength than we get from a standard proxy request. This is more expensive.
Take a look at our typical proxy function below.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
We're going to add the bypass
argument to this function. There are a whole slew of different values we can pass in here. generic_level_4
is the strongest and it costs 85 API credits per use. This makes our proxy connection 85 times more expensive than a standard proxy with ScrapeOps!
You can view the other bypass
options here.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"bypass": "generic_level_4",
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
With our proxy connection written, we are now ready for production! Take a look below and see our finalized crawler.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from time import sleep
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"bypass": "generic_level_4",
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
listing_id: int = 0
price_currency: str = ""
price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36")
prefs = {
"profile.managed_default_content_settings.javascript": 2,
"profile.managed_default_content_settings.stylesheets": 2
}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Successfully pinged {url}")
content = driver.page_source
script_tag_begin_index = content.find('"itemListElement"')
script_tag_end_index = content.find('"numberOfItems"')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}"
json_data = json.loads(json_string)
list_elements = json_data["itemListElement"]
for element in list_elements:
name = element["name"]
link = element["url"]
listing_id = link.split("/")[-2]
currency = element["offers"]["priceCurrency"]
price = element["offers"]["price"]
search_data = SearchData(
name=name,
url=link,
listing_id=listing_id,
price_currency=currency,
price=float(price)
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Now that our crawler is ready to go, it's time to test it out in production! For the sake of saving API credits, we'll only crawl one page. You can change this to more if you'd like.
Feel free to change any of the following in the main
:
MAX_RETRIES
MAX_THREADS
PAGES
LOCATION
keyword_list
Here is our main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
You can view our results below.
It took 24.339 seconds to crawl a single page. This is primarily because of the anti-bot bypass. When we ping Etsy through ScrapeOps, ScrapeOps solves any anti-bot challenges that come our way.
Build An Etsy Scraper
We're now crawling items from Etsy. Time to move to the next thing on our to-do list, a review scraper. This scraper is going to read our crawler results and scrape reviews for each item we crawl.
- Building a review parser.
- Reading the CSV file.
- Adding data storage.
- Adding concurrency.
- Integrating with a proxy.
Step 1: Create Simple Review Data Parser
We'll start by parsing reviews for each item. This function should look a bit familiar. It bears the same structure as our first parser. We have error handling, retries, and our parsing logic.
You can view it in the code snippet below. Once again, we also find our review data embedded in JSON on the page.
def process_item(row, location, retries=3):
url = row["url"]
print("getting", url)
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
driver = webdriver.Chrome(options=options)
driver.get(url)
logger.info(f"successfully pinged: {url}")
try:
content = driver.page_source
script_tag_begin_index = content.find('"review":')
script_tag_end_index = content.find('}}]')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index] + "}}]}"
json_data = json.loads(json_string)
list_elements = json_data["review"]
for element in list_elements:
print(element)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
- We find our reviews exactly the same way we found our products earlier.
- After rebuilding our JSON, we print each review object to the terminal.
Step 2: Loading URLs To Scrape
Loading urls is easy. We need to read our CSV file into an array of dict
objects. We use Python's builtin csv.DictReader
to do this.
Take a look at the snippet below. It's kind of like an equivalent to start_scrape()
from our crawl.
This is our process_results()
function.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
After fitting things together, our code looks like this.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from time import sleep
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"bypass": "generic_level_4",
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
listing_id: int = 0
price_currency: str = ""
price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36")
prefs = {
"profile.managed_default_content_settings.javascript": 2,
"profile.managed_default_content_settings.stylesheets": 2
}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Successfully pinged {url}")
content = driver.page_source
script_tag_begin_index = content.find('"itemListElement"')
script_tag_end_index = content.find('"numberOfItems"')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}"
json_data = json.loads(json_string)
list_elements = json_data["itemListElement"]
for element in list_elements:
name = element["name"]
link = element["url"]
listing_id = link.split("/")[-2]
currency = element["offers"]["priceCurrency"]
price = element["offers"]["price"]
search_data = SearchData(
name=name,
url=link,
listing_id=listing_id,
price_currency=currency,
price=float(price)
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
print("getting", url)
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
driver = webdriver.Chrome(options=options)
driver.get(url)
logger.info(f"successfully pinged: {url}")
try:
content = driver.page_source
script_tag_begin_index = content.find('"review":')
script_tag_end_index = content.find('}}]')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index] + "}}]}"
json_data = json.loads(json_string)
list_elements = json_data["review"]
for element in list_elements:
print(element)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
process_item()
is used to parse reviews for individual items during our crawl.process_results()
is used to read the CSV file and callprocess_item()
on each row from the file.
Step 3: Storing the Scraped Data
Like our search results, we need to store our review data. DataPipeline
already gives us the ability to store dataclass
objects in a CSV file. we just need a new dataclass
.
We'll call this one ReviewData
. It's almost exactly like SearchData
.
@dataclass
class ReviewData:
name: str = ""
date: str = ""
review: str = ""
stars: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In the full code below, we open a DataPipeline
from inside our parsing function. When we parse our reviews, we turn them into ReviewData
objects and then we pass those objects into the pipeline.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from time import sleep
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"bypass": "generic_level_4",
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
listing_id: int = 0
price_currency: str = ""
price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
review: str = ""
stars: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36")
prefs = {
"profile.managed_default_content_settings.javascript": 2,
"profile.managed_default_content_settings.stylesheets": 2
}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Successfully pinged {url}")
content = driver.page_source
script_tag_begin_index = content.find('"itemListElement"')
script_tag_end_index = content.find('"numberOfItems"')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}"
json_data = json.loads(json_string)
list_elements = json_data["itemListElement"]
for element in list_elements:
name = element["name"]
link = element["url"]
listing_id = link.split("/")[-2]
currency = element["offers"]["priceCurrency"]
price = element["offers"]["price"]
search_data = SearchData(
name=name,
url=link,
listing_id=listing_id,
price_currency=currency,
price=float(price)
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
print("getting", url)
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
driver = webdriver.Chrome(options=options)
driver.get(url)
logger.info(f"successfully pinged: {url}")
try:
content = driver.page_source
script_tag_begin_index = content.find('"review":')
script_tag_end_index = content.find('}}]')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index] + "}}]}"
json_data = json.loads(json_string)
list_elements = json_data["review"]
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv")
for element in list_elements:
review_data = ReviewData(
name=element["author"]["name"],
date=element["datePublished"],
review=element["reviewBody"],
stars=element["reviewRating"]["ratingValue"]
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
- From inside our parsing function, we open a new
DataPipeline
. - We turn our reviews into
ReviewData
objects. - Every
ReviewData
object gets passed into the pipeline. - Once we've finished parsing the page, we close the pipeline.
Step 4: Adding Concurrency
Adding concurrency is done pretty much the same way we did it earlier. We need to replace a for
loop with the faster and more efficient, ThreadPoolExecutor
.
Here is our rewritten process_results()
function.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Our arguments are structured similarly to how they were earlier:
process_item
: the function we want to call on each available thread.reader
: the list ofdict
objects we want to process.- All other arguments once again get passed as lists as well.
Step 5: Bypassing Anti-Bots
No scraper is complete without anti-bot bypasses. We already have a proxy function that does this. We just need to use it.
We'll change one line of our parser and we should be good to go.
driver.get(get_scrapeops_url(url, location=location))
Here is our full production code containing both the crawler and scraper.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from time import sleep
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"bypass": "generic_level_4",
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
listing_id: int = 0
price_currency: str = ""
price: float = 0.0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
review: str = ""
stars: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36")
prefs = {
"profile.managed_default_content_settings.javascript": 2,
"profile.managed_default_content_settings.stylesheets": 2
}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Successfully pinged {url}")
content = driver.page_source
script_tag_begin_index = content.find('"itemListElement"')
script_tag_end_index = content.find('"numberOfItems"')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}"
json_data = json.loads(json_string)
list_elements = json_data["itemListElement"]
for element in list_elements:
name = element["name"]
link = element["url"]
listing_id = link.split("/")[-2]
currency = element["offers"]["priceCurrency"]
price = element["offers"]["price"]
search_data = SearchData(
name=name,
url=link,
listing_id=listing_id,
price_currency=currency,
price=float(price)
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
print("getting", url)
tries = 0
success = False
while tries <= retries and not success:
options = webdriver.ChromeOptions()
options.add_argument("--headless")
driver = webdriver.Chrome(options=options)
driver.get((get_scrapeops_url(url, location=location)))
logger.info(f"successfully pinged: {url}")
try:
content = driver.page_source
script_tag_begin_index = content.find('"review":')
script_tag_end_index = content.find('}}]')
json_string = "{"+ content[script_tag_begin_index:script_tag_end_index] + "}}]}"
json_data = json.loads(json_string)
list_elements = json_data["review"]
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv")
for element in list_elements:
review_data = ReviewData(
name=element["author"]["name"],
date=element["datePublished"],
review=element["reviewBody"],
stars=element["reviewRating"]["ratingValue"]
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Time to see how everything runs in production from start to finish. Once again, we'll be crawling one page. This program is very expensive to run. You can once again view our main
below.
Feel free to change any of the constants to tweak your results, just like before.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["coffee mug"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Here are the results from our crawl and scrape. The crawler spat out a CSV with 8 results and the full run took 69.01 seconds.
If you remember from earlier, our crawl took 24.339 seconds.
69.01 - 24.339 = 44.671 seconds. 44.671 seconds / 8 items = 5.584 seconds per item.
Legal and Ethical Considerations
In this tutorial, our Etsy scrape has been completely legal. Public data is generally legal to scrape no matter where you are. Private data (data behind a login) is a completely different story and you are subject to a whole slew of privacy and intellectual property laws.
While scraping Etsy data might be legal, Etsy does explicitly prohibit scraping. You need to be aware of both their Terms of Use and their robots.txt
.
Violating these policies can lead to suspension and even a permanent ban from the site.
You can view each of those documents below.
Conclusion
While Etsy is a very difficult site to scrape, we can do it. When we scrape Etsy, we make full use of the ScrapeOps Proxy Aggregator's anti-bot bypass system.
You should now have a solid understanding of our iterative build process and the features added: parsing, pagination, data storage, concurrency, and proxy integration.
If you want to know more about the tech stack used in this article, check out the links below.
More Python Web Scraping Guides
Here at ScrapeOps, we wrote the playbook on web scraping with Selenium. We have tons of resources for you to learn from.
Whether you're a seasoned dev, or you're brand new to coding, we have something for you. Keep reading our content and level up your scraping skillset!
If you want more from our "How To Scrape" series, take a look at one of the links below.