Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file.{"api_key": "your-super-secret-api-key"}
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "bypass": "generic_level_4", "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" price_currency: str = "" listing_id: int = 0 current_price: float = 0.0 original_price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" review: str = "" stars: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="wt-height-full") result_count = 0 last_listing = "" for div_card in div_cards: title = div_card.find("h3") if not title: continue name = title.get("title") a_tag = div_card.find("a") listing_id = a_tag.get("data-listing-id") if listing_id == last_listing: continue link = a_tag.get("href") stars = 0.0 has_stars = div_card.find("span", class_="wt-text-title-small") if has_stars: stars = float(has_stars.text) currency = "n/a" currency_holder = div_card.find("span", class_="currency-symbol") if currency_holder: currency = currency_holder.text prices = div_card.find_all("span", class_="currency-value") if len(prices) < 1: continue current_price = prices[0].text original_price = current_price if len(prices) > 1: original_price = prices[1].text search_data = SearchData( name=name, stars=stars, url=link, price_currency=currency, listing_id=listing_id, current_price=current_price, original_price=original_price ) data_pipeline.add_data(search_data) result_count+=1 last_listing = listing_id logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = [] for review_rank in range(4): card = soup.select_one(f"div[id='review-text-width-{review_rank}']") if card: review_cards.append(card) review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv") for review_card in review_cards: rating = review_card.select_one("input[name='rating']").get("value") review = review_card.find("p").text.strip() name_date_holder = review_card.find("a", class_="wt-text-link wt-mr-xs-1") if not name_date_holder: continue name = name_date_holder.get("aria-label").replace("Reviewer ", "") if not name: name = "n/a" date = name_date_holder.parent.text.strip().replace(name, "") if date == "": continue review_data = ReviewData( name=name, date=date, review=review, stars=rating ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
keyword_list
: Contains a list of keywords to be searched and scraped.MAX_RETRIES
: Specifies the number of times the scraper will retry fetching a page if it encounters an error.MAX_THREADS
: Defines the maximum number of threads to be used for concurrent scraping.PAGES
: Specifies the number of pages to scrape for each keyword.LOCATION
: Defines the geographic location from which the scraping requests appear to originate.bypass
setting of generic_level_4
. It costs 85 API credits per call. This configuration costs significantly more than standard requests to the ScrapeOps API.bypass
method in order to get past the anti-bots. The bypass
parameter costs extra API credits and this is an expensive scrape to run.Conceptually, this is pretty similar to other scraping projects we've made in this series. We need a both a search crawler and a review scraper.We'll build the search crawler in the following steps:https://www.etsy.com/search?q=coffee+mug&ref=pagination&page=2
https://www.etsy.com/search?q=coffee+mug&ref=pagination
q=coffee+mug
represents our query. q
is for "query" and coffee+mug
represents the value, "coffee mug".Our reconstructed URLs will look like this:https://www.etsy.com/search?q={formatted_keyword}&ref=pagination
div
with the class of wt-height-full
. This is difficult because there are other non product items on the page embedded within div
elements of this same kind.When we build our crawler, we'll need to add some code to filter these elements out of our search. Take a look at the page below, it shows a result item like this highlighted using inspect.div
with an id
of review-text-width-0
.review-text-width-0
.review-text-width-1
. It increments continually with each review.page
parameter. This one is pretty simple. To control our page, we simply add page={page_number+1}
to our URL. We use page_number+1
because Python's range()
begins counting at 0 but our page numbers begin at 1.Our fully formatted URL with pagination would look like this:https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}
ref=pagination
might look like it's relevant to our pagination, however it is not. ref
is short for referrer or referral. ref=pagination
tells Etsy that we were referred to the page via their pagination system. Leaving parameter in makes us look less like a bot.A normal person is going to visit page 2 by clicking the page 2 button, which gives us a referral to the page using the pagination.country
parameter with the ScrapeOps Proxy API."country": "us"
if we want to appear in the US."country": "uk"
.mkdir etsy-scraper cd etsy-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="wt-height-full") result_count = 0 last_listing = "" for div_card in div_cards: title = div_card.find("h3") if not title: continue name = title.get("title") a_tag = div_card.find("a") listing_id = a_tag.get("data-listing-id") if listing_id == last_listing: continue link = a_tag.get("href") stars = 0.0 has_stars = div_card.find("span", class_="wt-text-title-small") if has_stars: stars = float(has_stars.text) currency = "n/a" currency_holder = div_card.find("span", class_="currency-symbol") if currency_holder: currency = currency_holder.text prices = div_card.find_all("span", class_="currency-value") if len(prices) < 1: continue current_price = prices[0].text original_price = current_price if len(prices) > 1: original_price = prices[1].text search_data = { "name": name, "stars": stars, "url": link, "price_currency": currency, "listing_id": listing_id, "current_price": current_price, "original_price": original_price } print(search_data) result_count+=1 last_listing = listing_id logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
scrape_search_results()
does the following when we extract the data:div
elements: soup.find_all("div", class_="wt-height-full")
.title
with div_card.find("h3")
. If the title
doesn't exist, we skip this div
.div_card.find("a")
finds our a_tag
. From here, we can extract our link
and listing_id
.div_card.find("span", class_="wt-text-title-small")
finds whether or not we have stars present. If there are stars present, we assign them to our rating
. If there are no stars present, we assign a default rating of 0.0.div_card.find("span", class_="currency-symbol")
finds our currency_holder
. If a currency_holder
is present, we save the currency symbol.current_price
and original_price
so we can see if an item is on sale or not.page
. We also need a function that allows us to crawl multiple pages. Take a look at the snippet below, start_scrape()
.def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="wt-height-full") result_count = 0 last_listing = "" for div_card in div_cards: title = div_card.find("h3") if not title: continue name = title.get("title") a_tag = div_card.find("a") listing_id = a_tag.get("data-listing-id") if listing_id == last_listing: continue link = a_tag.get("href") stars = 0.0 has_stars = div_card.find("span", class_="wt-text-title-small") if has_stars: stars = float(has_stars.text) currency = "n/a" currency_holder = div_card.find("span", class_="currency-symbol") if currency_holder: currency = currency_holder.text prices = div_card.find_all("span", class_="currency-value") if len(prices) < 1: continue current_price = prices[0].text original_price = current_price if len(prices) > 1: original_price = prices[1].text search_data = { "name": name, "stars": stars, "url": link, "price_currency": currency, "listing_id": listing_id, "current_price": current_price, "original_price": original_price } print(search_data) result_count+=1 last_listing = listing_id logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}
start_scrape()
allows us to crawl a list of pages instead of just the first page.dataclass
to represent our search objects. We also need a DataPipeline
to store these dataclass
objects inside a CSV file. Our dataclass
will be called SearchData
.Here is our SearchData
. It holds all of the information we've been extracting in our previous two iterations.@dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" price_currency: str = "" listing_id: int = 0 current_price: float = 0.0 original_price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. It takes in a dataclass
and stores it to a CSV file. It also filters out duplicates using the name
attribute.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
DataPipeline
inside of our main
. The DataPipeline
gets passed into start_scrape()
which in turn passes it to scrape_search_results()
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" price_currency: str = "" listing_id: int = 0 current_price: float = 0.0 original_price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="wt-height-full") result_count = 0 last_listing = "" for div_card in div_cards: title = div_card.find("h3") if not title: continue name = title.get("title") a_tag = div_card.find("a") listing_id = a_tag.get("data-listing-id") if listing_id == last_listing: continue link = a_tag.get("href") stars = 0.0 has_stars = div_card.find("span", class_="wt-text-title-small") if has_stars: stars = float(has_stars.text) currency = "n/a" currency_holder = div_card.find("span", class_="currency-symbol") if currency_holder: currency = currency_holder.text prices = div_card.find_all("span", class_="currency-value") if len(prices) < 1: continue current_price = prices[0].text original_price = current_price if len(prices) > 1: original_price = prices[1].text search_data = SearchData( name=name, stars=stars, url=link, price_currency=currency, listing_id=listing_id, current_price=current_price, original_price=original_price ) data_pipeline.add_data(search_data) result_count+=1 last_listing = listing_id logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData
is used to represent individual search result objects on the page.DataPipeline
opens a pipe to a CSV file. It is then used to save SearchData
objects to the CSV.ThreadPoolExecutor
to run our parsing function on multiple threads simultaneously.To accomplish this, we're going to replace the for
loop in start_scrape()
with ThreadPoolExecutor
.Here is our rewritten start_scrape()
function.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
scrape_search_results
is the function you wish to call on each thread.executor.map()
takes each item from each list and passes it into scrape_search_results
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" price_currency: str = "" listing_id: int = 0 current_price: float = 0.0 original_price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="wt-height-full") result_count = 0 last_listing = "" for div_card in div_cards: title = div_card.find("h3") if not title: continue name = title.get("title") a_tag = div_card.find("a") listing_id = a_tag.get("data-listing-id") if listing_id == last_listing: continue link = a_tag.get("href") stars = 0.0 has_stars = div_card.find("span", class_="wt-text-title-small") if has_stars: stars = float(has_stars.text) currency = "n/a" currency_holder = div_card.find("span", class_="currency-symbol") if currency_holder: currency = currency_holder.text prices = div_card.find_all("span", class_="currency-value") if len(prices) < 1: continue current_price = prices[0].text original_price = current_price if len(prices) > 1: original_price = prices[1].text search_data = SearchData( name=name, stars=stars, url=link, price_currency=currency, listing_id=listing_id, current_price=current_price, original_price=original_price ) data_pipeline.add_data(search_data) result_count+=1 last_listing = listing_id logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
start_scrape()
now runs our parsing function on multiple pages concurrently.ThreadPoolExecutor
gives us the ability to run any function on multiple threads.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
bypass
argument to this function. There are a whole slew of different values we can pass in here. generic_level_4
is the strongest and it costs 85 API credits per use. This makes our proxy connection 85 times more expensive than a standard proxy with ScrapeOps!You can view the other bypass
options here.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "bypass": "generic_level_4", "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "bypass": "generic_level_4", "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" price_currency: str = "" listing_id: int = 0 current_price: float = 0.0 original_price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="wt-height-full") result_count = 0 last_listing = "" for div_card in div_cards: title = div_card.find("h3") if not title: continue name = title.get("title") a_tag = div_card.find("a") listing_id = a_tag.get("data-listing-id") if listing_id == last_listing: continue link = a_tag.get("href") stars = 0.0 has_stars = div_card.find("span", class_="wt-text-title-small") if has_stars: stars = float(has_stars.text) currency = "n/a" currency_holder = div_card.find("span", class_="currency-symbol") if currency_holder: currency = currency_holder.text prices = div_card.find_all("span", class_="currency-value") if len(prices) < 1: continue current_price = prices[0].text original_price = current_price if len(prices) > 1: original_price = prices[1].text search_data = SearchData( name=name, stars=stars, url=link, price_currency=currency, listing_id=listing_id, current_price=current_price, original_price=original_price ) data_pipeline.add_data(search_data) result_count+=1 last_listing = listing_id logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
:keyword_list
: Contains a list of keywords to be searched and scraped.MAX_RETRIES
: Specifies the number of times the scraper will retry fetching a page if it encounters an error.MAX_THREADS
: Defines the maximum number of threads to be used for concurrent scraping.PAGES
: Specifies the number of pages to scrape for each keyword.LOCATION
: Defines the geographic location from which the scraping requests appear to originate.main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = [] for review_rank in range(4): card = soup.select_one(f"div[id='review-text-width-{review_rank}']") review_cards.append(card) for review_card in review_cards: rating = review_card.select_one("input[name='rating']").get("value") review = review_card.find("p").text.strip() name_date_holder = review_card.find("a", class_="wt-text-link wt-mr-xs-1") if not name_date_holder: continue name = name_date_holder.get("aria-label").replace("Reviewer ", "") if not name: name = "n/a" date = name_date_holder.parent.text.strip().replace(name, "") if date == "": continue review_data = { "name": name, "date": date, "review": review, "stars": rating } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
soup.select_one(f"div[id='review-text-width-{review_rank}']")
. Our review_rank
increments with each review.rating
: review_card.select_one("input[name='rating']").get("value")
review
: review_card.find("p").text.strip()
name
and date
and if they're not present, we skip that element because it doesn't match our review format.dict
objects.To do this, we'll use Python's builtin csv.DictReader
. Take a look at the snippet below. It's like the review scraper equivalent to start_scrape()
from our crawl.This is our process_results()
function.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "bypass": "generic_level_4", "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" price_currency: str = "" listing_id: int = 0 current_price: float = 0.0 original_price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="wt-height-full") result_count = 0 last_listing = "" for div_card in div_cards: title = div_card.find("h3") if not title: continue name = title.get("title") a_tag = div_card.find("a") listing_id = a_tag.get("data-listing-id") if listing_id == last_listing: continue link = a_tag.get("href") stars = 0.0 has_stars = div_card.find("span", class_="wt-text-title-small") if has_stars: stars = float(has_stars.text) currency = "n/a" currency_holder = div_card.find("span", class_="currency-symbol") if currency_holder: currency = currency_holder.text prices = div_card.find_all("span", class_="currency-value") if len(prices) < 1: continue current_price = prices[0].text original_price = current_price if len(prices) > 1: original_price = prices[1].text search_data = SearchData( name=name, stars=stars, url=link, price_currency=currency, listing_id=listing_id, current_price=current_price, original_price=original_price ) data_pipeline.add_data(search_data) result_count+=1 last_listing = listing_id logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = [] for review_rank in range(4): card = soup.select_one(f"div[id='review-text-width-{review_rank}']") review_cards.append(card) for review_card in review_cards: rating = review_card.select_one("input[name='rating']").get("value") review = review_card.find("p").text.strip() name_date_holder = review_card.find("a", class_="wt-text-link wt-mr-xs-1") if not name_date_holder: continue name = name_date_holder.get("aria-label").replace("Reviewer ", "") if not name: name = "n/a" date = name_date_holder.parent.text.strip().replace(name, "") if date == "": continue review_data = { "name": name, "date": date, "review": review, "stars": rating } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
process_item()
is used to parse reviews for individual items from our crawl.process_results()
is used to read the CSV file and call process_item()
on each row from the file.DataPipeline
already gives us the ability to store dataclass
objects in a CSV file. All we need to do is write a new dataclass
. We'll call this one ReviewData
.@dataclassclass ReviewData: name: str = "" date: str = "" review: str = "" stars: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
from inside our parsing function. As we parse the reviews, we turn them into ReviewData
objects and then we pass those objects into the pipeline.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "bypass": "generic_level_4", "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" price_currency: str = "" listing_id: int = 0 current_price: float = 0.0 original_price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" review: str = "" stars: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="wt-height-full") result_count = 0 last_listing = "" for div_card in div_cards: title = div_card.find("h3") if not title: continue name = title.get("title") a_tag = div_card.find("a") listing_id = a_tag.get("data-listing-id") if listing_id == last_listing: continue link = a_tag.get("href") stars = 0.0 has_stars = div_card.find("span", class_="wt-text-title-small") if has_stars: stars = float(has_stars.text) currency = "n/a" currency_holder = div_card.find("span", class_="currency-symbol") if currency_holder: currency = currency_holder.text prices = div_card.find_all("span", class_="currency-value") if len(prices) < 1: continue current_price = prices[0].text original_price = current_price if len(prices) > 1: original_price = prices[1].text search_data = SearchData( name=name, stars=stars, url=link, price_currency=currency, listing_id=listing_id, current_price=current_price, original_price=original_price ) data_pipeline.add_data(search_data) result_count+=1 last_listing = listing_id logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = [] for review_rank in range(4): card = soup.select_one(f"div[id='review-text-width-{review_rank}']") review_cards.append(card) review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv") for review_card in review_cards: rating = review_card.select_one("input[name='rating']").get("value") review = review_card.find("p").text.strip() name_date_holder = review_card.find("a", class_="wt-text-link wt-mr-xs-1") if not name_date_holder: continue name = name_date_holder.get("aria-label").replace("Reviewer ", "") if not name: name = "n/a" date = name_date_holder.parent.text.strip().replace(name, "") if date == "": continue review_data = ReviewData( name=name, date=date, review=review, stars=rating ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
process_item()
we open a new DataPipeline
.ReviewData
objects.ReviewData
object gets passed into the pipeline.for
loop with the faster and more efficient, ThreadPoolExecutor
.Here is our rewritten process_results()
function.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) )
process_item
: the function we want to call on each available thread.reader
: the list of dict
objects we want to process.requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "bypass": "generic_level_4", "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" price_currency: str = "" listing_id: int = 0 current_price: float = 0.0 original_price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" review: str = "" stars: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="wt-height-full") result_count = 0 last_listing = "" for div_card in div_cards: title = div_card.find("h3") if not title: continue name = title.get("title") a_tag = div_card.find("a") listing_id = a_tag.get("data-listing-id") if listing_id == last_listing: continue link = a_tag.get("href") stars = 0.0 has_stars = div_card.find("span", class_="wt-text-title-small") if has_stars: stars = float(has_stars.text) currency = "n/a" currency_holder = div_card.find("span", class_="currency-symbol") if currency_holder: currency = currency_holder.text prices = div_card.find_all("span", class_="currency-value") if len(prices) < 1: continue current_price = prices[0].text original_price = current_price if len(prices) > 1: original_price = prices[1].text search_data = SearchData( name=name, stars=stars, url=link, price_currency=currency, listing_id=listing_id, current_price=current_price, original_price=original_price ) data_pipeline.add_data(search_data) result_count+=1 last_listing = listing_id logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = [] for review_rank in range(4): card = soup.select_one(f"div[id='review-text-width-{review_rank}']") if card: review_cards.append(card) review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv") for review_card in review_cards: rating = review_card.select_one("input[name='rating']").get("value") review = review_card.find("p").text.strip() name_date_holder = review_card.find("a", class_="wt-text-link wt-mr-xs-1") if not name_date_holder: continue name = name_date_holder.get("aria-label").replace("Reviewer ", "") if not name: name = "n/a" date = name_date_holder.parent.text.strip().replace(name, "") if date == "": continue review_data = ReviewData( name=name, date=date, review=review, stars=rating ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
below. Feel free to change any of the constants to tweak your results, just like before.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
. Violating these policies can lead to suspension and even a permanent ban from the site. You can view those documents below.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file.{"api_key": "your-super-secret-api-key"}
.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom time import sleepfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "bypass": "generic_level_4", "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" listing_id: int = 0 price_currency: str = "" price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" review: str = "" stars: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36") prefs = { "profile.managed_default_content_settings.javascript": 2, "profile.managed_default_content_settings.stylesheets": 2 } options.add_experimental_option("prefs", prefs) driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Successfully pinged {url}") content = driver.page_source script_tag_begin_index = content.find('"itemListElement"') script_tag_end_index = content.find('"numberOfItems"') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}" json_data = json.loads(json_string) list_elements = json_data["itemListElement"] for element in list_elements: name = element["name"] link = element["url"] listing_id = link.split("/")[-2] currency = element["offers"]["priceCurrency"] price = element["offers"]["price"] search_data = SearchData( name=name, url=link, listing_id=listing_id, price_currency=currency, price=float(price) ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] print("getting", url) tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") driver = webdriver.Chrome(options=options) driver.get((get_scrapeops_url(url, location=location))) logger.info(f"successfully pinged: {url}") try: content = driver.page_source script_tag_begin_index = content.find('"review":') script_tag_end_index = content.find('}}]') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index] + "}}]}" json_data = json.loads(json_string) list_elements = json_data["review"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv") for element in list_elements: review_data = ReviewData( name=element["author"]["name"], date=element["datePublished"], review=element["reviewBody"], stars=element["reviewRating"]["ratingValue"] ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: Controls the number of retry attempts in case of an error.MAX_THREADS
: Defines the number of threads to use during concurrent scraping.PAGES
: The number of pages to scrape for each keyword.LOCATION
: Controls the location (country) to simulate the user browsing from a specific region.keyword_list
: A list of keywords you want to scrape.bypass
setting of generic_level_4
. It costs 85 API credits per call. This configuration costs significantly more than standard requests to the ScrapeOps API.bypass
argument. This feature will get us through even the toughest of anti-bots. The bypass
parameter costs extra API credits and this is an expensive scrape to run.At a high level, this is going to be pretty similar to other scraping projects from this series. We need to build both a search crawler and a review scraper.We'll build the search crawler in the following steps:driver.get()
.Take a look at the screenshot below. This shot contains an Etsy search results page. Take a look at the URL:https://www.etsy.com/search?q=coffee+mug&ref=pagination&page=2
https://www.etsy.com/search?q=coffee+mug&ref=pagination
q=coffee+mug
represents our search query.q
is for "query" andcoffee+mug
represents the value, "coffee mug".https://www.etsy.com/search?q={formatted_keyword}&ref=pagination
page
parameter. This one is pretty simple.If we add page={page_number+1}
to our URL, we can control the page number of our results. We use page_number+1
because Python's range()
begins counting at 0 but our page numbers begin at 1.With support for pagination added, our URLs look like this:https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}
ref=pagination
might look like it's relevant to our pagination, however it is not. ref
is short for referrer or referral.ref=pagination
tells Etsy that we were referred to the page via their pagination system. This parameter makes us look less like a bot.A normal person is going to visit page 2 by clicking the page 2 button, which gives us a referral to the page using the pagination.country
parameter with the ScrapeOps API."country": "us"
if we want to appear in the US."country": "uk"
.mkdir etsy-scraper cd etsy-scraper
python -m venv venv
source venv/bin/activate
pip install selenium
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom time import sleepfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36") prefs = { "profile.managed_default_content_settings.javascript": 2, "profile.managed_default_content_settings.stylesheets": 2 } options.add_experimental_option("prefs", prefs) driver = webdriver.Chrome(options=options) try: driver.get(url) logger.info(f"Successfully pinged {url}") content = driver.page_source script_tag_begin_index = content.find('"itemListElement"') script_tag_end_index = content.find('"numberOfItems"') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}" json_data = json.loads(json_string) list_elements = json_data["itemListElement"] for element in list_elements: print(element) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
scrape_search_results()
does the following when we extract the data:content.find('"itemListElement"')
gets the beginning of our item list.content.find('"numberOfItems"')
finds the end of our item list.page
. We also need a function that allows us to crawl multiple pages.Take a look at the snippet below, start_scrape()
.def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom time import sleepfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36") prefs = { "profile.managed_default_content_settings.javascript": 2, "profile.managed_default_content_settings.stylesheets": 2 } options.add_experimental_option("prefs", prefs) driver = webdriver.Chrome(options=options) try: driver.get(url) logger.info(f"Successfully pinged {url}") content = driver.page_source script_tag_begin_index = content.find('"itemListElement"') script_tag_end_index = content.find('"numberOfItems"') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}" json_data = json.loads(json_string) list_elements = json_data["itemListElement"] for element in list_elements: print(element) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}
start_scrape()
lets us call our parsing function on an entire list of pages.SearchData
. It holds all of the information we've been extracting in our previous two iterations.@dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" price_currency: str = "" listing_id: int = 0 current_price: float = 0.0 original_price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. It takes in a dataclass
and stores it to a CSV file. It also filters out duplicates using the name
attribute.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
DataPipeline
inside of our main
. The DataPipeline
gets passed into start_scrape()
which in turn passes it to scrape_search_results()
.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom time import sleepfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" listing_id: int = 0 price_currency: str = "" price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36") prefs = { "profile.managed_default_content_settings.javascript": 2, "profile.managed_default_content_settings.stylesheets": 2 } options.add_experimental_option("prefs", prefs) driver = webdriver.Chrome(options=options) try: driver.get(url) logger.info(f"Successfully pinged {url}") content = driver.page_source script_tag_begin_index = content.find('"itemListElement"') script_tag_end_index = content.find('"numberOfItems"') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}" json_data = json.loads(json_string) list_elements = json_data["itemListElement"] for element in list_elements: name = element["name"] link = element["url"] listing_id = link.split("/")[-2] currency = element["offers"]["priceCurrency"] price = element["offers"]["price"] search_data = SearchData( name=name, url=link, listing_id=listing_id, price_currency=currency, price=float(price) ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData
represents individual search result objects on the page.DataPipeline
opens a pipe to our CSV file. With the pipeline open, we can save SearchData
objects to the CSV.ThreadPoolExecutor
to run our parsing function on multiple threads at the same time.To do this, we'll replace the for
loop in start_scrape()
with ThreadPoolExecutor
.Here is our rewritten start_scrape()
function.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
scrape_search_results
is the function you wish to call on each thread.executor.map()
takes each item from each list and passes it into scrape_search_results
.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom time import sleepfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" listing_id: int = 0 price_currency: str = "" price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36") prefs = { "profile.managed_default_content_settings.javascript": 2, "profile.managed_default_content_settings.stylesheets": 2 } options.add_experimental_option("prefs", prefs) driver = webdriver.Chrome(options=options) try: driver.get(url) logger.info(f"Successfully pinged {url}") content = driver.page_source script_tag_begin_index = content.find('"itemListElement"') script_tag_end_index = content.find('"numberOfItems"') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}" json_data = json.loads(json_string) list_elements = json_data["itemListElement"] for element in list_elements: name = element["name"] link = element["url"] listing_id = link.split("/")[-2] currency = element["offers"]["priceCurrency"] price = element["offers"]["price"] search_data = SearchData( name=name, url=link, listing_id=listing_id, price_currency=currency, price=float(price) ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
start_scrape()
, our parsing function now runs on multiple pages concurrently.ThreadPoolExecutor
gives us the ability to run any function on multiple threads.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
bypass
argument to this function. There are a whole slew of different values we can pass in here. generic_level_4
is the strongest and it costs 85 API credits per use. This makes our proxy connection 85 times more expensive than a standard proxy with ScrapeOps!You can view the other bypass
options here.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "bypass": "generic_level_4", "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom time import sleepfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "bypass": "generic_level_4", "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" listing_id: int = 0 price_currency: str = "" price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36") prefs = { "profile.managed_default_content_settings.javascript": 2, "profile.managed_default_content_settings.stylesheets": 2 } options.add_experimental_option("prefs", prefs) driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Successfully pinged {url}") content = driver.page_source script_tag_begin_index = content.find('"itemListElement"') script_tag_end_index = content.find('"numberOfItems"') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}" json_data = json.loads(json_string) list_elements = json_data["itemListElement"] for element in list_elements: name = element["name"] link = element["url"] listing_id = link.split("/")[-2] currency = element["offers"]["priceCurrency"] price = element["offers"]["price"] search_data = SearchData( name=name, url=link, listing_id=listing_id, price_currency=currency, price=float(price) ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
:MAX_RETRIES
MAX_THREADS
PAGES
LOCATION
keyword_list
main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def process_item(row, location, retries=3): url = row["url"] print("getting", url) tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") driver = webdriver.Chrome(options=options) driver.get(url) logger.info(f"successfully pinged: {url}") try: content = driver.page_source script_tag_begin_index = content.find('"review":') script_tag_end_index = content.find('}}]') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index] + "}}]}" json_data = json.loads(json_string) list_elements = json_data["review"] for element in list_elements: print(element) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
dict
objects. We use Python's builtin csv.DictReader
to do this.Take a look at the snippet below. It's kind of like an equivalent to start_scrape()
from our crawl.This is our process_results()
function.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries)
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom time import sleepfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "bypass": "generic_level_4", "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" listing_id: int = 0 price_currency: str = "" price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36") prefs = { "profile.managed_default_content_settings.javascript": 2, "profile.managed_default_content_settings.stylesheets": 2 } options.add_experimental_option("prefs", prefs) driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Successfully pinged {url}") content = driver.page_source script_tag_begin_index = content.find('"itemListElement"') script_tag_end_index = content.find('"numberOfItems"') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}" json_data = json.loads(json_string) list_elements = json_data["itemListElement"] for element in list_elements: name = element["name"] link = element["url"] listing_id = link.split("/")[-2] currency = element["offers"]["priceCurrency"] price = element["offers"]["price"] search_data = SearchData( name=name, url=link, listing_id=listing_id, price_currency=currency, price=float(price) ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] print("getting", url) tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") driver = webdriver.Chrome(options=options) driver.get(url) logger.info(f"successfully pinged: {url}") try: content = driver.page_source script_tag_begin_index = content.find('"review":') script_tag_end_index = content.find('}}]') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index] + "}}]}" json_data = json.loads(json_string) list_elements = json_data["review"] for element in list_elements: print(element) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
process_item()
is used to parse reviews for individual items during our crawl.process_results()
is used to read the CSV file and call process_item()
on each row from the file.DataPipeline
already gives us the ability to store dataclass
objects in a CSV file. we just need a new dataclass
.We'll call this one ReviewData
. It's almost exactly like SearchData
.@dataclassclass ReviewData: name: str = "" date: str = "" review: str = "" stars: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
from inside our parsing function. When we parse our reviews, we turn them into ReviewData
objects and then we pass those objects into the pipeline.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom time import sleepfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "bypass": "generic_level_4", "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" listing_id: int = 0 price_currency: str = "" price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" review: str = "" stars: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36") prefs = { "profile.managed_default_content_settings.javascript": 2, "profile.managed_default_content_settings.stylesheets": 2 } options.add_experimental_option("prefs", prefs) driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Successfully pinged {url}") content = driver.page_source script_tag_begin_index = content.find('"itemListElement"') script_tag_end_index = content.find('"numberOfItems"') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}" json_data = json.loads(json_string) list_elements = json_data["itemListElement"] for element in list_elements: name = element["name"] link = element["url"] listing_id = link.split("/")[-2] currency = element["offers"]["priceCurrency"] price = element["offers"]["price"] search_data = SearchData( name=name, url=link, listing_id=listing_id, price_currency=currency, price=float(price) ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] print("getting", url) tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") driver = webdriver.Chrome(options=options) driver.get(url) logger.info(f"successfully pinged: {url}") try: content = driver.page_source script_tag_begin_index = content.find('"review":') script_tag_end_index = content.find('}}]') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index] + "}}]}" json_data = json.loads(json_string) list_elements = json_data["review"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv") for element in list_elements: review_data = ReviewData( name=element["author"]["name"], date=element["datePublished"], review=element["reviewBody"], stars=element["reviewRating"]["ratingValue"] ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline
.ReviewData
objects.ReviewData
object gets passed into the pipeline.for
loop with the faster and more efficient, ThreadPoolExecutor
.Here is our rewritten process_results()
function.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) )
process_item
: the function we want to call on each available thread.reader
: the list of dict
objects we want to process.driver.get(get_scrapeops_url(url, location=location))
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom time import sleepfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "bypass": "generic_level_4", "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" listing_id: int = 0 price_currency: str = "" price: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" review: str = "" stars: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.etsy.com/search?q={formatted_keyword}&ref=pagination&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36") prefs = { "profile.managed_default_content_settings.javascript": 2, "profile.managed_default_content_settings.stylesheets": 2 } options.add_experimental_option("prefs", prefs) driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Successfully pinged {url}") content = driver.page_source script_tag_begin_index = content.find('"itemListElement"') script_tag_end_index = content.find('"numberOfItems"') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index-1] + "}" json_data = json.loads(json_string) list_elements = json_data["itemListElement"] for element in list_elements: name = element["name"] link = element["url"] listing_id = link.split("/")[-2] currency = element["offers"]["priceCurrency"] price = element["offers"]["price"] search_data = SearchData( name=name, url=link, listing_id=listing_id, price_currency=currency, price=float(price) ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] print("getting", url) tries = 0 success = False while tries <= retries and not success: options = webdriver.ChromeOptions() options.add_argument("--headless") driver = webdriver.Chrome(options=options) driver.get((get_scrapeops_url(url, location=location))) logger.info(f"successfully pinged: {url}") try: content = driver.page_source script_tag_begin_index = content.find('"review":') script_tag_end_index = content.find('}}]') json_string = "{"+ content[script_tag_begin_index:script_tag_end_index] + "}}]}" json_data = json.loads(json_string) list_elements = json_data["review"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '')}.csv") for element in list_elements: review_data = ReviewData( name=element["author"]["name"], date=element["datePublished"], review=element["reviewBody"], stars=element["reviewRating"]["ratingValue"] ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
below.Feel free to change any of the constants to tweak your results, just like before.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["coffee mug"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
.Violating these policies can lead to suspension and even a permanent ban from the site.You can view each of those documents below.