How to Scrape eBay With Requests and BeautifulSoup
eBay is one of the oldest active sites on the internet and it's a great place to find items or sale whether you're looking for to BuyItNow or bid on items for auction. Founded in 1995, eBay has been around for nearly 30 years. While the site has gotten sleeker and more interactive over the years, it's kept the same basic structure. eBay has an incredibly large dataset and we can use web scraping to harvest this data.
Throughout this guide, you'll learn how to scrape eBay.
- TLDR How to Scrape eBay
- How To Architect Our Scraper
- Understanding How To Scrape eBay
- Setting Up Our eBay Scraper
- Build an eBay Search Crawler
- Build an eBay Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape eBay
Need an eBay scraper? Use this one! All you need to do is replace "gpu"
in the keyword_list
with your search criteria. For instance, if you want to buy a motherboard, replace "gpu"
with "motherboard"
. After you've done that, place your script in a folder.
Inside this folder, add a config.json
file with your ScrapeOps API key.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
comment: str = ""
verified: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
main_holder = soup.select_one("div[id='srp-river-results']")
div_cards = main_holder.select("div[class='s-item__info clearfix']")
for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")
buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True
is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text
search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[class='fdbk-container__details']")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review_card in review_cards:
username_container = review_card.select_one("div[class='fdbk-container__details__info__username']")
username = username_container.text
comment = review_card.select_one("div[class='fdbk-container__details__comment']").text
verified = False
verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']")
if verified_holder:
verified = True
review_data = ReviewData(
name=username,
comment=comment,
verified=verified
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Then, you can run the file with.
python name_of_your_python_file.py
How To Architect Our eBay Scraper
To scrape eBay, we need to build two scrapers.
- Our first one will be a result crawler. The crawler will scrape important item details from our search results.
- The second scraper will be an item scraper. The scraper will look up each item and scrape the reviews.
Our crawler needs to do the following:
- Perform a search and parse the results.
- Use pagination so we can get our results in batches.
- Store the data we've extraced.
- Concurrently perform tasks 1 through 3 so we can scrape multiple pages at the same time.
- Use the ScrapeOps Proxy API in order to get past anti-bots and anything else that might block us.
The scraper will run this set of instructions:
- Read the CSV file generated by our crawler.
- Lookup and parse each row from our CSV file.
- Store review data for each item in a new report.
- Use concurrency to perform tasks 2 and 3 on multiple items simultaneously.
- Once again, integrate with the ScrapeOps Proxy to get past any roadblocks.
Understanding How To Scrape eBay
Step 1: How To Request eBay Pages
To fetch a page on eBay, we need to perform a GET request. GET requests are simple, they're exactly what they sound like. We use them to get information.
When you go to a site in your browser, you're actually performing a GET for the HTML of the site. Your browser then goes through and reads the HTML and displays it to you as a website.
To search GPUs on eBay, we perform a GET on the following domain:
https://www.ebay.com/sch/i.html?_nkw=gpu&_pgn=0`
- After the
?
, we have different parameters that are fed into our URL. _nkw=gpu
is the one we'll pay attention to right now._nkw
tells eBay that we want to perform a search.gpu
is the item we're searching for.
Take a look at the image below and see for yourself.
Item URLs on eBay are laid out a bit differently. Each item has a unique number and the rest of the URL is basically just a bunch of jumbled nonsense.
Since we'll be getting these URLs from the search results, we don't have to worry about reconstructing these ones.
Step 2: How To Extract Data From eBay Results and Pages
eBay is pretty much an old fashioned scraping job. We're dealing with static information embedded within the HTML of the page. Both the search page and the item page use a div
as a container card for information.
On the results page, each item gets its own container and we parse the information from that container. On the item page, each review gets its own container and we use the same process to pull out the results.
Here, you can see the container card for search result items. Each result card has a class
of s-item__info clearfix
.
With BeautifulSoup, we can go and extract all of our relevant information from this card.
Review data is pretty similar. All review details are embedded inside a container with the class
of fdbk-container__details
.
In comparison to other sites from our "How To Scrape" series, parsing eBay is a very old school job.
Step 3: How To Control Pagination
Remember the search results page you saw earlier?
We had a URL of:
https://www.ebay.com/sch/i.html?_nkw=gpu&_pgn=0
- Our parameters get separated by
&
and_pgn
is our only other parameter. _pgn
represents our page number. So in this URL, our page number is 0.
Our fully formatted URLs will look like this:
https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}
Step 4: Geolocated Data
To handle geolocated data, we'll be using the ScrapeOps Proxy API. ScrapeOps can route us through any country they support.
All we need to do is add the country
param when we perform a request to ScrapeOps.
- If we want to appear fomr the US, we can simply pass in
us
as ourcountry
. - If we want to appear from the UK, we would pass in
uk
.
Setting Up Our eBay Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir ebay-scraper
cd ebay-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A eBay Search Crawler
Finally, we can start coding. We'll start by building our crawler. We'll add the following in order:
- Parsing
- Pagination
- Data Storage
- Concurrency
- Proxy Integration
Step 1: Create Simple Search Data Parser
We'll get started by creating a search parser.
The goal of this function is pretty simple: Perform a search and pull data from the search results.
In the code below, alongside our parser, we setup our basic structure with logging, error handling and retry logic.
The parsing function finds a result card and extracts information from the card.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
main_holder = soup.select_one("div[id='srp-river-results']")
div_cards = main_holder.select("div[class='s-item__info clearfix']")
for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")
buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True
is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text
search_data = {
"name": name,
"url": url,
"price": price,
"buy_it_now": buy_it_now,
"is_auction": is_auction,
"auction_end": auction_end
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
Step 2: Add Pagination
As we discussed earlier, to paginated our results, all we need to do is add a parameter to our URL, _pgn={page_number}
.
In this section, we''ll also add a start_scrape()
function. The entire purpose of this function is to run scrape_search_results()
on multiple pages.
Here is start_scrape()
.
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
At this point, our full script looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
main_holder = soup.select_one("div[id='srp-river-results']")
div_cards = main_holder.select("div[class='s-item__info clearfix']")
for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")
buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True
is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text
search_data = {
"name": name,
"url": url,
"price": price,
"buy_it_now": buy_it_now,
"is_auction": is_auction,
"auction_end": auction_end
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
With pagination, we get batches of results that we can control.
Step 3: Storing the Scraped Data
Next, we'll be storing our data. We'll add two data objects in order to do this.
Our first one will be our SearchData
class and the second one will be our DataPipeline
.
SearchData
is used to represent the result cards we find on the site.DataPipeline
takes in adataclass
(likeSearchData
) and pipes it to a CSV.
Along with storing the data, our pipeline also removes duplicates.
Here is our SearchData
.
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is our DataPipeline
.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
In our fully updated code, instead of printing each search_data
item, we turn it into a SearchData
item and pass it into the DataPipeline
.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
main_holder = soup.select_one("div[id='srp-river-results']")
div_cards = main_holder.select("div[class='s-item__info clearfix']")
for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")
buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True
is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text
search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 4: Adding Concurrency
Concurrency is key to maximizing our speed and efficiency. In this section, we'll be adding support for concurrency using multithreading. ThreadPoolExecutor
will be used to control our threads.
Let's refactor start_scrape()
.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
It may look a bit weird, but once you understand it, this function is actually pretty simple. Here are the args we pass into executor.map()
:
scrape_search_results
: is the function we'd like to run on every available thread.- All arguments that
scrape_search_results()
would normally take are passed into arrays to later be passed into the function on each thread using it.
With concurrency support, our full script looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
main_holder = soup.select_one("div[id='srp-river-results']")
div_cards = main_holder.select("div[class='s-item__info clearfix']")
for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")
buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True
is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text
search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 5: Bypassing Anti-Bots
Our crawler definitely wouldn't be finished without proxy support. We're going to use the ScrapeOps Proxy API to get past everything that might stand in our way. We're not malware, but our crawler looks very abnormal and that causes anti-bots to try and block us.
Take a look at the function below. This holds the key to everything. We take in several arguments (including the url we want to scrape) and combine it all into a ScrapeOps proxied url.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
api_key
: you ScrapeOps API key.url
: the url you want to scrape.country
: the country you'd like to be routed through.
Here is our finalized crawler.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
main_holder = soup.select_one("div[id='srp-river-results']")
div_cards = main_holder.select("div[class='s-item__info clearfix']")
for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")
buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True
is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text
search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Before we test this out in production, let's take a look at our main
. I've set PAGES
to 5 and all the other constants remain the same.
If you'd like to tweak your results, feel free to change any of them yourself.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
main_holder = soup.select_one("div[id='srp-river-results']")
div_cards = main_holder.select("div[class='s-item__info clearfix']")
for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")
buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True
is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text
search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Here are the results from the crawl.
We crawled 5 pages in 8.41 seconds. This comes out to 1.68 seconds per page!
Build An eBay Scraper
In this section, we're going to build a scraper that goes through and parses all of our results from the file generated by the crawler. We'll add all of the following in order:
- Parsing
- Reading the CSV file
- Data Storage
- Concurrency
- Proxy support
Step 1: Create Simple Item Data Parser
Let's get started by building another parser. Structurally, this parsing function is almost identical to the one we made earlier. The biggest difference is the CSS selectors that we use.
Here is our parsing function.
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[class='fdbk-container__details']")
for review_card in review_cards:
username_container = review_card.select_one("div[class='fdbk-container__details__info__username']")
username = username_container.text
comment = review_card.select_one("div[class='fdbk-container__details__comment']").text
verified = False
verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']")
if verified_holder:
verified = True
review_data = {
"name": username,
"comment": comment,
"verified": verified
}
print(review_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
In this function we perform the following actions:
"div[class='fdbk-container__details']"
is used to find each of thereview_cards
by their CSS selector.- In each of the
review_cards
, we go through and pull the following:username
withreview_card.select_one("div[class='fdbk-container__details__info__username']")
review_card.select_one("div[class='fdbk-container__details__comment']").text
pulls our comment- We use
review_card.select_one("div[class='fdbk-container__details__verified__purchase']")
to determine whether or not the purchase was verified.
Step 2: Loading URLs To Scrape
Now, we need to load our urls so we can parse them. process_results()
does exactly that. We then go through and use a for
loop to run process_item()
on each of the items from the file.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
At this point in the project, this is how our script looks.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
main_holder = soup.select_one("div[id='srp-river-results']")
div_cards = main_holder.select("div[class='s-item__info clearfix']")
for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")
buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True
is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text
search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[class='fdbk-container__details']")
for review_card in review_cards:
username_container = review_card.select_one("div[class='fdbk-container__details__info__username']")
username = username_container.text
comment = review_card.select_one("div[class='fdbk-container__details__comment']").text
verified = False
verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']")
if verified_holder:
verified = True
review_data = {
"name": username,
"comment": comment,
"verified": verified
}
print(review_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
Now that we're pulling the proper review data, we need to store it.
- Our
DataPipeline
can already do this, we just need anotherdataclass
to pass into it. - Here, we'll create a
ReviewData
class. It's almost identical toSearchData
, it just holds different fields.
@dataclass
class ReviewData:
name: str = ""
comment: str = ""
verified: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
This class holds the following:
name
: the username that left the review.comment
: the comment left by the reviewer.verified
: a boolean. Ifverified
isTrue
, the purchase has been verified.
Now that we're storing reviews, our code looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
comment: str = ""
verified: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.ebay.com/sch/i.html?_nkw={keyword}&_pgn={page_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
main_holder = soup.select_one("div[id='srp-river-results']")
div_cards = main_holder.select("div[class='s-item__info clearfix']")
for div_card in div_cards:
name = div_card.select_one("div[class='s-item__title']").text
link = div_card.select_one("a").get("href")
price = div_card.select_one("span[class='s-item__price']").text.replace("$", "").replace(",", "")
buy_it_now = False
buy_it_now_tag = div_card.select_one("span[class='s-item__dynamic s-item__formatBuyItNow']")
if buy_it_now_tag:
buy_it_now = True
is_auction = False
auction_end = None
auction_ends_tag = div_card.select_one("span[class='s-item__time-end']")
if auction_ends_tag:
is_auction = True
auction_ends_tag.text
search_data = SearchData(
name=name,
url=link,
price=price,
buy_it_now=buy_it_now,
is_auction=is_auction,
auction_end=auction_end
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[class='fdbk-container__details']")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review_card in review_cards:
username_container = review_card.select_one("div[class='fdbk-container__details__info__username']")
username = username_container.text
comment = review_card.select_one("div[class='fdbk-container__details__comment']").text
verified = False
verified_holder = review_card.select_one("div[class='fdbk-container__details__verified__purchase']")
if verified_holder:
verified = True
review_data = ReviewData(
name=username,
comment=comment,
verified=verified
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["gpu"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
By adding ReviewData
, we can pass our reviews into the DataPipeline
and store them to a CSV.
Step 4: Adding Concurrency
Just like before, we'll add concurrency with a small refactor. This time, we'll be calling process_item()
instead of scrape_search_results()
. Once again, we'll pass the args for process_item()
into executor.map()
as arrays.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Pay attention to the args here:
process_item
is the function we want to call on all the available threads.reader
is the array of items we want to parse.location
andretries
each get passed in as arrays.
Step 5: Bypassing Anti-Bots
We already have our get_scrapeops_url()
function. All we need to do is add it to requests.get()
. With this line of code, all of our process_item()
calls are routed through the ScrapeOps Proxy API.
response = requests.get(get_scrapeops_url(url, location=location))
Here is our final code including both the crawler and the scraper.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
buy_it_now: bool = False
is_auction: bool = False
auction_end: str = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field