Then check out ScrapeOps, the complete toolkit for web scraping.
"gpu"
in the keyword_list
with your search criteria. For instance, if you want to buy a motherboard, replace "gpu"
with "motherboard"
. After you've done that, place your script in a folder.
Inside this folder, add a config.json
file with your ScrapeOps API key.
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" buy_it_now: bool = False is_auction: bool = False auction_end: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" comment: str = "" verified: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_holder = soup.select_one("div[id='srp-river-results']") div_cards = main_holder.select("div[class='s-item__info clearfix']") for div_card in div_cards: name = div_card.select_one("div[class='s-item__title']").text link = div_card.select_one("a").get("href") price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "") buy_it_now = False buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']") if buy_it_now_tag: buy_it_now = True is_auction = False auction_end = None auction_ends_tag = div_card.select_one("span[class='s-item__time-end']") if auction_ends_tag: is_auction = True auction_ends_tag.text search_data = SearchData( name=name, url=link, price=price, buy_it_now=buy_it_now, is_auction=is_auction, auction_end=auction_end ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.select("div[class='fdbk-container__details']") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review_card in review_cards: username_container = review_card.select_one("div[class='fdbk-container__details__info__username']") username = username_container.text comment = review_card.select_one("div[class='fdbk-container__details__comment']").text verified = False verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']") if verified_holder: verified = True review_data = ReviewData( name=username, comment=comment, verified=verified ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
python name_of_your_python_file.py
https://www.ebay.com/sch/i.html?_nkw=gpu&_pgn=0`
?
, we have different parameters that are fed into our URL._nkw=gpu
is the one we'll pay attention to right now._nkw
tells eBay that we want to perform a search.gpu
is the item we're searching for.div
as a container card for information.
On the results page, each item gets its own container and we parse the information from that container. On the item page, each review gets its own container and we use the same process to pull out the results.
Here, you can see the container card for search result items. Each result card has a class
of s-item__info clearfix
.
With BeautifulSoup, we can go and extract all of our relevant information from this card.
class
of fdbk-container__details
.
https://www.ebay.com/sch/i.html?_nkw=gpu&_pgn=0
&
and _pgn
is our only other parameter._pgn
represents our page number. So in this URL, our page number is 0.https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}
country
param when we perform a request to ScrapeOps.
us
as our country
.uk
.mkdir ebay-scraper cd ebay-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_holder = soup.select_one("div[id='srp-river-results']") div_cards = main_holder.select("div[class='s-item__info clearfix']") for div_card in div_cards: name = div_card.select_one("div[class='s-item__title']").text link = div_card.select_one("a").get("href") price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "") buy_it_now = False buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']") if buy_it_now_tag: buy_it_now = True is_auction = False auction_end = None auction_ends_tag = div_card.select_one("span[class='s-item__time-end']") if auction_ends_tag: is_auction = True auction_ends_tag.text search_data = { "name": name, "url": url, "price": price, "buy_it_now": buy_it_now, "is_auction": is_auction, "auction_end": auction_end } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
_pgn={page_number}
.
In this section, we''ll also add a start_scrape()
function. The entire purpose of this function is to run scrape_search_results()
on multiple pages.
Here is start_scrape()
.
def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_holder = soup.select_one("div[id='srp-river-results']") div_cards = main_holder.select("div[class='s-item__info clearfix']") for div_card in div_cards: name = div_card.select_one("div[class='s-item__title']").text link = div_card.select_one("a").get("href") price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "") buy_it_now = False buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']") if buy_it_now_tag: buy_it_now = True is_auction = False auction_end = None auction_ends_tag = div_card.select_one("span[class='s-item__time-end']") if auction_ends_tag: is_auction = True auction_ends_tag.text search_data = { "name": name, "url": url, "price": price, "buy_it_now": buy_it_now, "is_auction": is_auction, "auction_end": auction_end } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
SearchData
class and the second one will be our DataPipeline
.
SearchData
is used to represent the result cards we find on the site.DataPipeline
takes in a dataclass
(like SearchData
) and pipes it to a CSV.SearchData
.
@dataclassclass SearchData: name: str = "" url: str = "" price: str = "" buy_it_now: bool = False is_auction: bool = False auction_end: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
.
class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
search_data
item, we turn it into a SearchData
item and pass it into the DataPipeline
.
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" buy_it_now: bool = False is_auction: bool = False auction_end: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_holder = soup.select_one("div[id='srp-river-results']") div_cards = main_holder.select("div[class='s-item__info clearfix']") for div_card in div_cards: name = div_card.select_one("div[class='s-item__title']").text link = div_card.select_one("a").get("href") price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "") buy_it_now = False buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']") if buy_it_now_tag: buy_it_now = True is_auction = False auction_end = None auction_ends_tag = div_card.select_one("span[class='s-item__time-end']") if auction_ends_tag: is_auction = True auction_ends_tag.text search_data = SearchData( name=name, url=link, price=price, buy_it_now=buy_it_now, is_auction=is_auction, auction_end=auction_end ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
ThreadPoolExecutor
will be used to control our threads.
Let's refactor start_scrape()
.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map()
:
scrape_search_results
: is the function we'd like to run on every available thread.scrape_search_results()
would normally take are passed into arrays to later be passed into the function on each thread using it.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" buy_it_now: bool = False is_auction: bool = False auction_end: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_holder = soup.select_one("div[id='srp-river-results']") div_cards = main_holder.select("div[class='s-item__info clearfix']") for div_card in div_cards: name = div_card.select_one("div[class='s-item__title']").text link = div_card.select_one("a").get("href") price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "") buy_it_now = False buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']") if buy_it_now_tag: buy_it_now = True is_auction = False auction_end = None auction_ends_tag = div_card.select_one("span[class='s-item__time-end']") if auction_ends_tag: is_auction = True auction_ends_tag.text search_data = SearchData( name=name, url=link, price=price, buy_it_now=buy_it_now, is_auction=is_auction, auction_end=auction_end ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
api_key
: you ScrapeOps API key.url
: the url you want to scrape.country
: the country you'd like to be routed through.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" buy_it_now: bool = False is_auction: bool = False auction_end: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_holder = soup.select_one("div[id='srp-river-results']") div_cards = main_holder.select("div[class='s-item__info clearfix']") for div_card in div_cards: name = div_card.select_one("div[class='s-item__title']").text link = div_card.select_one("a").get("href") price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "") buy_it_now = False buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']") if buy_it_now_tag: buy_it_now = True is_auction = False auction_end = None auction_ends_tag = div_card.select_one("span[class='s-item__time-end']") if auction_ends_tag: is_auction = True auction_ends_tag.text search_data = SearchData( name=name, url=link, price=price, buy_it_now=buy_it_now, is_auction=is_auction, auction_end=auction_end ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
. I've set PAGES
to 5 and all the other constants remain the same.
If you'd like to tweak your results, feel free to change any of them yourself.
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" buy_it_now: bool = False is_auction: bool = False auction_end: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_holder = soup.select_one("div[id='srp-river-results']") div_cards = main_holder.select("div[class='s-item__info clearfix']") for div_card in div_cards: name = div_card.select_one("div[class='s-item__title']").text link = div_card.select_one("a").get("href") price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "") buy_it_now = False buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']") if buy_it_now_tag: buy_it_now = True is_auction = False auction_end = None auction_ends_tag = div_card.select_one("span[class='s-item__time-end']") if auction_ends_tag: is_auction = True auction_ends_tag.text search_data = SearchData( name=name, url=link, price=price, buy_it_now=buy_it_now, is_auction=is_auction, auction_end=auction_end ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.select("div[class='fdbk-container__details']") for review_card in review_cards: username_container = review_card.select_one("div[class='fdbk-container__details__info__username']") username = username_container.text comment = review_card.select_one("div[class='fdbk-container__details__comment']").text verified = False verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']") if verified_holder: verified = True review_data = { "name": username, "comment": comment, "verified": verified } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
"div[class='fdbk-container__details']"
is used to find each of the review_cards
by their CSS selector.review_cards
, we go through and pull the following:
username
with review_card.select_one("div[class='fdbk-container__details__info__username']")
review_card.select_one("div[class='fdbk-container__details__comment']").text
pulls our commentreview_card.select_one("div[class='fdbk-container__details__verified__purchase']")
to determine whether or not the purchase was verified.process_results()
does exactly that. We then go through and use a for
loop to run process_item()
on each of the items from the file.
def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" buy_it_now: bool = False is_auction: bool = False auction_end: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_holder = soup.select_one("div[id='srp-river-results']") div_cards = main_holder.select("div[class='s-item__info clearfix']") for div_card in div_cards: name = div_card.select_one("div[class='s-item__title']").text link = div_card.select_one("a").get("href") price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "") buy_it_now = False buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']") if buy_it_now_tag: buy_it_now = True is_auction = False auction_end = None auction_ends_tag = div_card.select_one("span[class='s-item__time-end']") if auction_ends_tag: is_auction = True auction_ends_tag.text search_data = SearchData( name=name, url=link, price=price, buy_it_now=buy_it_now, is_auction=is_auction, auction_end=auction_end ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.select("div[class='fdbk-container__details']") for review_card in review_cards: username_container = review_card.select_one("div[class='fdbk-container__details__info__username']") username = username_container.text comment = review_card.select_one("div[class='fdbk-container__details__comment']").text verified = False verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']") if verified_holder: verified = True review_data = { "name": username, "comment": comment, "verified": verified } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
DataPipeline
can already do this, we just need another dataclass
to pass into it.ReviewData
class. It's almost identical to SearchData
, it just holds different fields.@dataclassclass ReviewData: name: str = "" comment: str = "" verified: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
name
: the username that left the review.comment
: the comment left by the reviewer.verified
: a boolean. If verified
is True
, the purchase has been verified.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" buy_it_now: bool = False is_auction: bool = False auction_end: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" comment: str = "" verified: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_holder = soup.select_one("div[id='srp-river-results']") div_cards = main_holder.select("div[class='s-item__info clearfix']") for div_card in div_cards: name = div_card.select_one("div[class='s-item__title']").text link = div_card.select_one("a").get("href") price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "") buy_it_now = False buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']") if buy_it_now_tag: buy_it_now = True is_auction = False auction_end = None auction_ends_tag = div_card.select_one("span[class='s-item__time-end']") if auction_ends_tag: is_auction = True auction_ends_tag.text search_data = SearchData( name=name, url=link, price=price, buy_it_now=buy_it_now, is_auction=is_auction, auction_end=auction_end ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.select("div[class='fdbk-container__details']") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review_card in review_cards: username_container = review_card.select_one("div[class='fdbk-container__details__info__username']") username = username_container.text comment = review_card.select_one("div[class='fdbk-container__details__comment']").text verified = False verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']") if verified_holder: verified = True review_data = ReviewData( name=username, comment=comment, verified=verified ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
ReviewData
, we can pass our reviews into the DataPipeline
and store them to a CSV.
process_item()
instead of scrape_search_results()
. Once again, we'll pass the args for process_item()
into executor.map()
as arrays.
def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) )
process_item
is the function we want to call on all the available threads.reader
is the array of items we want to parse.location
and retries
each get passed in as arrays.get_scrapeops_url()
function. All we need to do is add it to requests.get()
. With this line of code, all of our process_item()
calls are routed through the ScrapeOps Proxy API.
response = requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" buy_it_now: bool = False is_auction: bool = False auction_end: str = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" comment: str = "" verified: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_holder = soup.select_one("div[id='srp-river-results']") div_cards = main_holder.select("div[class='s-item__info clearfix']") for div_card in div_cards: name = div_card.select_one("div[class='s-item__title']").text link = div_card.select_one("a").get("href") price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "") buy_it_now = False buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']") if buy_it_now_tag: buy_it_now = True is_auction = False auction_end = None auction_ends_tag = div_card.select_one("span[class='s-item__time-end']") if auction_ends_tag: is_auction = True auction_ends_tag.text search_data = SearchData( name=name, url=link, price=price, buy_it_now=buy_it_now, is_auction=is_auction, auction_end=auction_end ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.select("div[class='fdbk-container__details']") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review_card in review_cards: username_container = review_card.select_one("div[class='fdbk-container__details__info__username']") username = username_container.text comment = review_card.select_one("div[class='fdbk-container__details__comment']").text verified = False verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']") if verified_holder: verified = True review_data = ReviewData( name=username, comment=comment, verified=verified ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
PAGES
to 1. Scraping a single page already gives us a ton of items to parse, so we'll still be able to get a proper benchmark. Here is the main
we'll be using.
Feel free to tweak it however you'd like in order to change your results.
if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["gpu"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
and their Terms of Service.
When violating these terms, you could be temporarily blocked or even permanently banned from the site.
Public data (data not gated behind a login) is considered public information in most places and therefore free to scrape without any legal hassle.
However, if you need to login to view certain data, that data is considered private and you need to adhere to the site's terms.
If you're not sure that your scraper is legal, consult an attorney.