Skip to main content

Scrape eBay With Python Requests and BeautifulSoup

How to Scrape eBay With Requests and BeautifulSoup

eBay is one of the oldest active sites on the internet and it's a great place to find items or sale whether you're looking for to BuyItNow or bid on items for auction. Founded in 1995, eBay has been around for nearly 30 years. While the site has gotten sleeker and more interactive over the years, it's kept the same basic structure. eBay has an incredibly large dataset and we can use web scraping to harvest this data.

Throughout this guide, you'll learn how to scrape eBay.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape eBay

Need an eBay scraper? Use this one! All you need to do is replace "gpu" in the keyword_list with your search criteria. For instance, if you want to buy a motherboard, replace "gpu" with "motherboard". After you've done that, place your script in a folder.

Inside this folder, add a config.json file with your ScrapeOps API key.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
comment: str = ""
verified: bool = False


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

main_holder = soup.select_one("div[id='srp-river-results']")

div_cards = main_holder.select("div[class='s-item__info clearfix']")


for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")

buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True

is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text

search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[class='fdbk-container__details']")

review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

for review_card in review_cards:
username_container = review_card.select_one("div[class='fdbk-container__details__info__username']")
username = username_container.text
comment = review_card.select_one("div[class='fdbk-container__details__comment']").text

verified = False
verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']")
if verified_holder:
verified = True

review_data = ReviewData(
name=username,
comment=comment,
verified=verified
)
review_pipeline.add_data(review_data)


review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Then, you can run the file with.

python name_of_your_python_file.py

How To Architect Our eBay Scraper

To scrape eBay, we need to build two scrapers.

  1. Our first one will be a result crawler. The crawler will scrape important item details from our search results.
  2. The second scraper will be an item scraper. The scraper will look up each item and scrape the reviews.

Our crawler needs to do the following:

  1. Perform a search and parse the results.
  2. Use pagination so we can get our results in batches.
  3. Store the data we've extraced.
  4. Concurrently perform tasks 1 through 3 so we can scrape multiple pages at the same time.
  5. Use the ScrapeOps Proxy API in order to get past anti-bots and anything else that might block us.

The scraper will run this set of instructions:

  1. Read the CSV file generated by our crawler.
  2. Lookup and parse each row from our CSV file.
  3. Store review data for each item in a new report.
  4. Use concurrency to perform tasks 2 and 3 on multiple items simultaneously.
  5. Once again, integrate with the ScrapeOps Proxy to get past any roadblocks.

Understanding How To Scrape eBay

Step 1: How To Request eBay Pages

To fetch a page on eBay, we need to perform a GET request. GET requests are simple, they're exactly what they sound like. We use them to get information.

When you go to a site in your browser, you're actually performing a GET for the HTML of the site. Your browser then goes through and reads the HTML and displays it to you as a website.

To search GPUs on eBay, we perform a GET on the following domain:

https://www.ebay.com/sch/i.html?_nkw=gpu&_pgn=0`
  • After the ?, we have different parameters that are fed into our URL.
  • _nkw=gpu is the one we'll pay attention to right now.
  • _nkw tells eBay that we want to perform a search.
  • gpu is the item we're searching for.

Take a look at the image below and see for yourself.

eBay Search Results Page

Item URLs on eBay are laid out a bit differently. Each item has a unique number and the rest of the URL is basically just a bunch of jumbled nonsense.

Since we'll be getting these URLs from the search results, we don't have to worry about reconstructing these ones.

eBay product detail page


Step 2: How To Extract Data From eBay Results and Pages

eBay is pretty much an old fashioned scraping job. We're dealing with static information embedded within the HTML of the page. Both the search page and the item page use a div as a container card for information.

On the results page, each item gets its own container and we parse the information from that container. On the item page, each review gets its own container and we use the same process to pull out the results.

Here, you can see the container card for search result items. Each result card has a class of s-item__info clearfix.

With BeautifulSoup, we can go and extract all of our relevant information from this card.

eBay Inspect Search Results Page

Review data is pretty similar. All review details are embedded inside a container with the class of fdbk-container__details.

eBay Inspect Review

In comparison to other sites from our "How To Scrape" series, parsing eBay is a very old school job.


Step 3: How To Control Pagination

Remember the search results page you saw earlier?

We had a URL of:

https://www.ebay.com/sch/i.html?_nkw=gpu&_pgn=0
  • Our parameters get separated by & and _pgn is our only other parameter.
  • _pgn represents our page number. So in this URL, our page number is 0.

Our fully formatted URLs will look like this:

https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}

Step 4: Geolocated Data

To handle geolocated data, we'll be using the ScrapeOps Proxy API. ScrapeOps can route us through any country they support.

All we need to do is add the country param when we perform a request to ScrapeOps.

  • If we want to appear fomr the US, we can simply pass in us as our country.
  • If we want to appear from the UK, we would pass in uk.

Setting Up Our eBay Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir ebay-scraper

cd ebay-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A eBay Search Crawler

Finally, we can start coding. We'll start by building our crawler. We'll add the following in order:

  • Parsing
  • Pagination
  • Data Storage
  • Concurrency
  • Proxy Integration

Step 1: Create Simple Search Data Parser

We'll get started by creating a search parser.

The goal of this function is pretty simple: Perform a search and pull data from the search results.

In the code below, alongside our parser, we setup our basic structure with logging, error handling and retry logic.

The parsing function finds a result card and extracts information from the card.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

main_holder = soup.select_one("div[id='srp-river-results']")

div_cards = main_holder.select("div[class='s-item__info clearfix']")


for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")

buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True

is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text

search_data = {
"name": name,
"url": url,
"price": price,
"buy_it_now": buy_it_now,
"is_auction": is_auction,
"auction_end": auction_end
}
print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

Step 2: Add Pagination

As we discussed earlier, to paginated our results, all we need to do is add a parameter to our URL, _pgn={page_number}.

In this section, we''ll also add a start_scrape() function. The entire purpose of this function is to run scrape_search_results() on multiple pages.

Here is start_scrape().

def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)

At this point, our full script looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

main_holder = soup.select_one("div[id='srp-river-results']")

div_cards = main_holder.select("div[class='s-item__info clearfix']")


for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")

buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True

is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text

search_data = {
"name": name,
"url": url,
"price": price,
"buy_it_now": buy_it_now,
"is_auction": is_auction,
"auction_end": auction_end
}
print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

With pagination, we get batches of results that we can control.


Step 3: Storing the Scraped Data

Next, we'll be storing our data. We'll add two data objects in order to do this.

Our first one will be our SearchData class and the second one will be our DataPipeline.

  • SearchData is used to represent the result cards we find on the site.
  • DataPipeline takes in a dataclass (like SearchData) and pipes it to a CSV.

Along with storing the data, our pipeline also removes duplicates.

Here is our SearchData.

@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is our DataPipeline.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

In our fully updated code, instead of printing each search_data item, we turn it into a SearchData item and pass it into the DataPipeline.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

main_holder = soup.select_one("div[id='srp-river-results']")

div_cards = main_holder.select("div[class='s-item__info clearfix']")


for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")

buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True

is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text

search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 4: Adding Concurrency

Concurrency is key to maximizing our speed and efficiency. In this section, we'll be adding support for concurrency using multithreading. ThreadPoolExecutor will be used to control our threads.

Let's refactor start_scrape().

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

It may look a bit weird, but once you understand it, this function is actually pretty simple. Here are the args we pass into executor.map():

  • scrape_search_results: is the function we'd like to run on every available thread.
  • All arguments that scrape_search_results() would normally take are passed into arrays to later be passed into the function on each thread using it.

With concurrency support, our full script looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

main_holder = soup.select_one("div[id='srp-river-results']")

div_cards = main_holder.select("div[class='s-item__info clearfix']")


for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")

buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True

is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text

search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 5: Bypassing Anti-Bots

Our crawler definitely wouldn't be finished without proxy support. We're going to use the ScrapeOps Proxy API to get past everything that might stand in our way. We're not malware, but our crawler looks very abnormal and that causes anti-bots to try and block us.

Take a look at the function below. This holds the key to everything. We take in several arguments (including the url we want to scrape) and combine it all into a ScrapeOps proxied url.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
  • api_key: you ScrapeOps API key.
  • url: the url you want to scrape.
  • country: the country you'd like to be routed through.

Here is our finalized crawler.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

main_holder = soup.select_one("div[id='srp-river-results']")

div_cards = main_holder.select("div[class='s-item__info clearfix']")


for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")

buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True

is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text

search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Before we test this out in production, let's take a look at our main. I've set PAGES to 5 and all the other constants remain the same.

If you'd like to tweak your results, feel free to change any of them yourself.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

main_holder = soup.select_one("div[id='srp-river-results']")

div_cards = main_holder.select("div[class='s-item__info clearfix']")


for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")

buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True

is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text

search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Here are the results from the crawl.

Crawler Performance Results

We crawled 5 pages in 8.41 seconds. This comes out to 1.68 seconds per page!


Build An eBay Scraper

In this section, we're going to build a scraper that goes through and parses all of our results from the file generated by the crawler. We'll add all of the following in order:

  • Parsing
  • Reading the CSV file
  • Data Storage
  • Concurrency
  • Proxy support

Step 1: Create Simple Item Data Parser

Let's get started by building another parser. Structurally, this parsing function is almost identical to the one we made earlier. The biggest difference is the CSS selectors that we use.

Here is our parsing function.

def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[class='fdbk-container__details']")

for review_card in review_cards:
username_container = review_card.select_one("div[class='fdbk-container__details__info__username']")
username = username_container.text
comment = review_card.select_one("div[class='fdbk-container__details__comment']").text

verified = False
verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']")
if verified_holder:
verified = True

review_data = {
"name": username,
"comment": comment,
"verified": verified
}

print(review_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")

In this function we perform the following actions:

  • "div[class='fdbk-container__details']" is used to find each of the review_cards by their CSS selector.
  • In each of the review_cards, we go through and pull the following:
    • username with review_card.select_one("div[class='fdbk-container__details__info__username']")
    • review_card.select_one("div[class='fdbk-container__details__comment']").text pulls our comment
    • We use review_card.select_one("div[class='fdbk-container__details__verified__purchase']") to determine whether or not the purchase was verified.

Step 2: Loading URLs To Scrape

Now, we need to load our urls so we can parse them. process_results() does exactly that. We then go through and use a for loop to run process_item() on each of the items from the file.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)

At this point in the project, this is how our script looks.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

main_holder = soup.select_one("div[id='srp-river-results']")

div_cards = main_holder.select("div[class='s-item__info clearfix']")


for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")

buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True

is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text

search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[class='fdbk-container__details']")

for review_card in review_cards:
username_container = review_card.select_one("div[class='fdbk-container__details__info__username']")
username = username_container.text
comment = review_card.select_one("div[class='fdbk-container__details__comment']").text

verified = False
verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']")
if verified_holder:
verified = True

review_data = {
"name": username,
"comment": comment,
"verified": verified
}

print(review_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 3: Storing the Scraped Data

Now that we're pulling the proper review data, we need to store it.

  1. Our DataPipeline can already do this, we just need another dataclass to pass into it.
  2. Here, we'll create a ReviewData class. It's almost identical to SearchData, it just holds different fields.
@dataclass
class ReviewData:
name: str = ""
comment: str = ""
verified: bool = False


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

This class holds the following:

  • name: the username that left the review.
  • comment: the comment left by the reviewer.
  • verified: a boolean. If verified is True, the purchase has been verified.

Now that we're storing reviews, our code looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
comment: str = ""
verified: bool = False


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

main_holder = soup.select_one("div[id='srp-river-results']")

div_cards = main_holder.select("div[class='s-item__info clearfix']")


for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")

buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True

is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text

search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[class='fdbk-container__details']")

review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

for review_card in review_cards:
username_container = review_card.select_one("div[class='fdbk-container__details__info__username']")
username = username_container.text
comment = review_card.select_one("div[class='fdbk-container__details__comment']").text

verified = False
verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']")
if verified_holder:
verified = True

review_data = ReviewData(
name=username,
comment=comment,
verified=verified
)
review_pipeline.add_data(review_data)


review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

By adding ReviewData, we can pass our reviews into the DataPipeline and store them to a CSV.


Step 4: Adding Concurrency

Just like before, we'll add concurrency with a small refactor. This time, we'll be calling process_item() instead of scrape_search_results(). Once again, we'll pass the args for process_item() into executor.map() as arrays.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Pay attention to the args here:

  • process_item is the function we want to call on all the available threads.
  • reader is the array of items we want to parse.
  • location and retries each get passed in as arrays.

Step 5: Bypassing Anti-Bots

We already have our get_scrapeops_url() function. All we need to do is add it to requests.get(). With this line of code, all of our process_item() calls are routed through the ScrapeOps Proxy API.

response = requests.get(get_scrapeops_url(url, location=location))

Here is our final code including both the crawler and the scraper.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field