How to Scrape Yelp With Selenium
Yelp is a treasure trove when it comes to online reviews. Business owners depend on it and reviewers on Yelp have been known to be brutally honest. Because of this, Yelp is a great place to gather data about different businesses and it holds very large datasets compared to other review sites.
In this detailed tutorial, we'll go over how to scrape Yelp with Python and Selenium.
- TLDR: How to Scrape Yelp
- How To Architect Our Scraper
- Understanding How To Scrape Yelp
- Setting Up Our Yelp Scraper
- Build A Yelp Search Crawler
- Build A Yelp Scraper
- Legal and Ethical Considerations
- Conclusion
- More Python Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Yelp
Need a Yelp scraper, but don't have the time to read? Take this one.
This scraper is all set and ready to scrape restaurants from Yelp. If you need to scrape a different type of business, you'll need to make some tweaks...
Yelp uses different CSS selectors and layouts for different types of businesses. To run this baby, all you need to do is create a config.json
file with your ScrapeOps API keys and place it in the same folder as this script.
import os
import csv
import json
import logging
from time import sleep
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
"residential": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
family_friendly: bool = False
date: str = ""
position: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find_element(By.CSS_SELECTOR, "img")
title = img.get_attribute("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
rating = 0.0
has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']")
if len(has_rating[0].text) > 0:
if has_rating.text[0].isdigit():
has_rating = float(rating[0].text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.implicitly_wait(10)
driver.get(get_scrapeops_url(url, location=location))
try:
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
info_section = json.loads(script.get_attribute("innerHTML"))
anon_count = 1
list_elements = info_section["itemListElement"]
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{anon_count}"
anon_count += 1
family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]
review_data = ReviewData(
name=name,
family_friendly=family_friendly,
date=date,
position=position
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
As mentioned above, to run this code, add a config.json
file with your ScrapeOps API key and place it in the same folder as this script. Feel free to tweak any of the following constants:
MAX_RETRIES
: Defines the maximum number of times the script will attempt to retry an operation (such as scraping data) in case of failure.MAX_THREADS
: Sets the maximum number of threads that can run concurrently. It controls how many threads (i.e., parallel tasks) can be used for scraping or processing data.PAGES
: Defines how many pages of search results should be scraped for each keyword.LOCATION
: Specifies the location or country for the search query, which is used in the search URL.
If you decide to change the keywords_list
, make sure to inspect the pages you're scraping. The layout and CSS might change.
How To Architect Our Yelp Scraper
When building our Yelp scraper, there are a few things we need to keep in mind.
To start, we need to perform a search and extract data from the results. Afterward, we need to lookup each of the businesses in those results individually.
We'll build a crawler to scrape the search results. Then, we'll create a scraper that looks up these businesses and scrapes data from their individual Yelp pages.
Our crawler needs to be able to do the following:
-
Perform a search and parse the results. When parsing the results, we extract the following variables:
name
: the name of the business.sponsored
: a boolean variable. If the post is an ad,sponsored
isTrue
.stars
: how many stars the business has based on overall reviews.rank
: where the business shows up in our search results.review_count
: is the amount of reviews the business has.url
: is the url to the Yelp page for the business.
-
We should be able to paginate our search in order to control our results.
-
Once we've got control of our batches, we need to store the data we've extracted.
-
Perform steps 1 through 3 with concurrency, so we can scrape multiple pages of data simultaneously.
-
Integrate with the ScrapeOps Proxy API in order to get past any roadblocks the site may have in place.
Then, our scraper will need to peform these actions.
-
Load urls to scrape
-
Parse the Yelp page for each url, getting the following variables for each review:
name
: the name of the reviewer.family_friendly
: whether or not they consider the business to be family friendly.date
: the date that the review was uploaded.position
: the position of the review on the page. For instance, the top review would have theposition
of 1.
-
Store the extracted data.
-
Perform tasks 1 through 3 concurrently.
-
Integrate with the ScrapeOps Proxy API.
Understanding How To Scrape Yelp
Before we write our scraping code, we need to understand exactly how to get our information and how to extract it from the page. We'll use the ScrapeOps Proxy Aggregator API to handle our geolocation.
We'll go through these next few steps in order to plan out how to build our scraper.
Step 1: How To Request Yelp Pages
When you perform a search on Yelp, you get a URL that looks like this:
https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}
Let's search for restaurants in the US. This URL would lookup restaurants
in the us
.
https://www.yelp.com/search?find_desc=restaurants&find_loc=us
find_desc
is our search parameter and find_loc
is our location.
Take a look at the image below and you can see for yourself.
Businesses pages on Yelp all have a url that comes after /biz/
. We don't need to worry too much about these ones because we'll be extracting them straight from our search pages.
Take a look at the image below.
Step 2: How To Extract Data From Yelp Results and Pages
On the search results page, each business gets its own card on the page. When we search for restaurants, each of these cards has a data-testid
of serp-ia-card
.
Once we can find these cards, we can go through and extract their information.
Take a look at the picture below so you can get a better understanding of this.
When dealing with businesses on Yelp, much of our review data gets embedded in a JSON blob on the page. Take a look below.
Step 3: How To Control Pagination
We can control our pagination with the start
param. We actually don't need to specify a page number in the URL.
Each page gets 10 results, so we actually multiply our page number by 10.
- We'll start counting at 0, so page 1 (
start=0
) will give us results 1 through 10. - Page 2 (
start=10
) will give us 11 through 20... and so on and so forth.
Step 4: Geolocated Data
When dealing with geolocation, we'll use both the ScrapeOps API and we'll also pass our location into the URL.
So, when we search for restaurants in the US, we'll pass us
in as our country
to the the ScrapeOps API and we'll also pass it into the find_loc
param of our Yelp URL.
Setting Up Our yelp Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir yelp-scraper
cd yelp-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install selenium
Make sure you have webdriver installed! If you don't, you can check here
Build A Yelp Search Crawler
Time to get started on our crawler! As we build it, we'll add parsing, pagination, data storage, concurrency and proxy integration.
In the following sections, we'll go through and add these in step by step.
Step 1: Create Simple Search Data Parser
Let's build a basic parser. In this part, aside from parsing, we're going to add our imports and some basic retry logic.
import os
import csv
import json
import logging
from time import sleep
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find_element(By.CSS_SELECTOR, "img")
title = img.get_attribute("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
rating = 0.0
has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']")
if len(has_rating[0].text) > 0:
if has_rating.text[0].isdigit():
has_rating = float(rating[0].text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
yelp_url = f"https://www.yelp.com{link}"
search_data = {
"name": title,
"sponsored": sponsored,
"stars": stars,
"rank": ranking,
"review_count": review_count,
"url": yelp_url
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
Some key points to notice here:
OPTIONS.add_argument("--headless")
sets our browser to run in headless mode. This saves valuable resources.- We start Selenium with the argument
options=OPTIONS
in order to ensure that we're always running in headless mode. - To find each business card on the page, we use
driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']")
- We use basic string formatting to find out if each result is sponsored,
sponsored = card_text[0].isdigit() == False
. Since all non sponsored items are ranked, all of them begin with a digit. - We also find its
img
and use itsalt
to pull the name of the business,img.get_attribute("alt")
. - If an item is not sponsored, we then split the string at
.
and pull the first element from list and convert it to an integer. - To find out if there is a rating present, we use the CSS selector,
"div span[data-font-weight='semibold']"
. If there is a rating present, we ectract it. - To find our review count, we also check if the word
review
is present. If it is, we once again use the.split()
method to extract the review count. - Finally, we pull our
a_element
and get itshref
to get the link to the page for each individual business.
Step 2: Add Pagination
Adding pagination is actually very simple. We just make a slight change to our url. Our url will now include the start
parameter.
Take a look at the code below.
import os
import csv
import json
import logging
from time import sleep
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find_element(By.CSS_SELECTOR, "img")
title = img.get_attribute("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
rating = 0.0
has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']")
if len(has_rating[0].text) > 0:
if has_rating.text[0].isdigit():
has_rating = float(rating[0].text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
yelp_url = f"https://www.yelp.com{link}"
search_data = {
"name": title,
"sponsored": sponsored,
"stars": stars,
"rank": ranking,
"review_count": review_count,
"url": yelp_url
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
As you can see, almost nothing has changed so far. Note that we also added a start_scrape()
function. At the moment, this doesn't do much other than give us the ability to scrape multiple pages.
Later on, we'll add concurrency to this function.
Step 3: Storing the Scraped Data
It's crucial that we store the data we scrape... otherwise there'd be no reason to scrape!
To store our search results, first we'll create a SearchData
class to hold our data.
Then this data will get passed into our DataPipeline
. This DataPipeline
pipes our data straight to a CSV file while removing duplicates.
First, take a look at our SearchData
.
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
And here is our DataPipeline
.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
After we put it all together, our script now looks like this.
import os
import csv
import json
import logging
from time import sleep
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find_element(By.CSS_SELECTOR, "img")
title = img.get_attribute("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
rating = 0.0
has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']")
if len(has_rating[0].text) > 0:
if has_rating.text[0].isdigit():
has_rating = float(rating[0].text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 4: Adding Concurrency
Remember when we added start_scrape()
earlier? Now it's time to add that concurrency. We'll use ThreadPoolExecutor
to scrape individual pages concurrently.
Take a look at this function refactored to use multithreading.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
In this function, our first argument is the function we want to run concurrently, scrape_search_results
. All other arguments are the arguments passed into this function. We pass these in as arrays so they can then get passed in to each function call.
Our full code now looks like this.
import os
import csv
import json
import logging
from time import sleep
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find_element(By.CSS_SELECTOR, "img")
title = img.get_attribute("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
rating = 0.0
has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']")
if len(has_rating[0].text) > 0:
if has_rating.text[0].isdigit():
has_rating = float(rating[0].text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 5: Bypassing Anti-Bots
Before we run our scraper in production, we need to add proxy integration. We'll be using the ScrapeOps Proxy API.
The function below is very simple but incredibly powerful.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
We pass the following arguments into get_scrapeops_url()
so it can return our proxied url with all these desired traits.
url
: the url we'd like to scrape.country
: the country we want to be routed through by the API.residential
: we want to use the residential proxy service. This greatly increases our chances of success when compared to a data center proxy.wait
: we want ScrapeOps to wait 2 seconds for content to render before sending it back to us.
With our proxy fully integrated, here is our final crawler.
import os
import csv
import json
import logging
from time import sleep
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find_element(By.CSS_SELECTOR, "img")
title = img.get_attribute("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
rating = 0.0
has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']")
if len(has_rating[0].text) > 0:
if has_rating.text[0].isdigit():
has_rating = float(rating[0].text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Time to run our crawler in production and get a feel for performance. Take a look at our main
below. We'll be scraping 5 pages of search results.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Feel free to change any of the following constants in order to tweak results:
MAX_THREADS
MAX_RETRIES
LOCATION
PAGES
Here are our results from the production run.
Our crawler finished scraping 5 pages of results in 46.2 seconds. This gives us an average speed of 9.24 seconds per page.
Build A Yelp Scraper
We now have a working crawler that performs a search, extracts the results and then stores them to a CSV file. In this portion of the tutorial, we'll go through and build a scraper that:
- Reads our CSV file
- Looks up and parses urls from the file
- Extracts and stores the data we parse
- Does all of this using concurrency
- Integrates with the ScrapeOps proxy to avoid getting blocked
Step 1: Create Simple Business Data Parser
To start, we'll once again write a basic parsing function. This one is pretty similar to our first parser with basic retries and error handling but you should notice that this parser finds a script
item on the page and pulls JSON data from that item.
Here is our process_business()
function.
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.implicitly_wait(10)
driver.get(url)
try:
script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
info_section = json.loads(script.get_attribute("innerHTML"))
anon_count = 1
list_elements = info_section["itemListElement"]
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{anon_count}"
anon_count += 1
family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]
review_data = {
"name": name,
"family_friendly": family_friendly,
"date": date,
"position": position
}
print(review_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
- We find our
script
element withdriver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
. - To convert this object into something we can work with, we use
json.loads()
on itsinnerHTML
. - Once we've loaded the JSON, we extract our data by simply calling items from the
dict
returned byjson.loads()
.
This code won't run yet, we need a way to read our CSV file!
Step 2: Loading URLs To Scrape
To use the function we wrote in the previous section, we need to be able to read our CSV file. We're going to write a function that goes through and reads the CSV file and then performs process_business()
on each of the rows from the file.
Here is our process_results()
function.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
Once we put the whole thing together, here is what our code looks like.
import os
import csv
import json
import logging
from time import sleep
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find_element(By.CSS_SELECTOR, "img")
title = img.get_attribute("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
rating = 0.0
has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']")
if len(has_rating[0].text) > 0:
if has_rating.text[0].isdigit():
has_rating = float(rating[0].text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.implicitly_wait(10)
driver.get(url)
try:
script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
info_section = json.loads(script.get_attribute("innerHTML"))
anon_count = 1
list_elements = info_section["itemListElement"]
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{anon_count}"
anon_count += 1
family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]
review_data = {
"name": name,
"family_friendly": family_friendly,
"date": date,
"position": position
}
print(review_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
Predictably, now we need to store our data. We already have most of the infrastructure we need. All we need is one more class, ReviewData
. It's almost identical to the SearchData
class from earlier. It just holds slightly different information.
Take a look at ReviewData
.
@dataclass
class ReviewData:
name: str = ""
family_friendly: bool = False
date: str = ""
position: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Now that we've got this new class, we'll pass it into another DataPipeline
. This version of our script does exactly that.
import os
import csv
import json
import logging
from time import sleep
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
family_friendly: bool = False
date: str = ""
position: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find_element(By.CSS_SELECTOR, "img")
title = img.get_attribute("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
rating = 0.0
has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']")
if len(has_rating[0].text) > 0:
if has_rating.text[0].isdigit():
has_rating = float(rating[0].text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.implicitly_wait(10)
driver.get(url)
try:
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
info_section = json.loads(script.get_attribute("innerHTML"))
anon_count = 1
list_elements = info_section["itemListElement"]
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{anon_count}"
anon_count += 1
family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]
review_data = ReviewData(
name=name,
family_friendly=family_friendly,
date=date,
position=position
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 4: Adding Concurrency
Just like before, now that we're storing our data, we need to be able to add concurrency using ThreadPoolExecutor
.
Let's rewrite process_results()
to do just that.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Just like earlier, we:
- pass our parsing function in as the first argument
- pass our function arguments in as arrays
Step 5: Bypassing Anti-Bots
We'll bypass any potential roadblocks exactly the way we did before. To do this, once again we'll use get_scrapeops_url()
. All that changes is one line of our parsing function.
driver.get(get_scrapeops_url(url, location=location))
Here is our code now that it's ready for production.
import os
import csv
import json
import logging
from time import sleep
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
family_friendly: bool = False
date: str = ""
position: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find_element(By.CSS_SELECTOR, "img")
title = img.get_attribute("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
rating = 0.0
has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']")
if len(has_rating[0].text) > 0:
if has_rating.text[0].isdigit():
has_rating = float(rating[0].text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.implicitly_wait(10)
driver.get(get_scrapeops_url(url, location=location))
try:
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
info_section = json.loads(script.get_attribute("innerHTML"))
anon_count = 1
list_elements = info_section["itemListElement"]
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{anon_count}"
anon_count += 1
family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]
review_data = ReviewData(
name=name,
family_friendly=family_friendly,
date=date,
position=position
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Let's test out our scraper! Here is our final main
. Once again, we'll start with a 5 page crawl.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Here are the results.
Our full crawl and scrape finished in 10 minutes 23 seconds, or 623 seconds. If you remember earlier, the crawl took 46 seconds. 623 - 46 = 577 seconds spent scraping individual businesses.
We had 55 total restaurants scraped in the crawl. 577 seconds / 55 restaurants = 10.49 seconds per page. Quite a few of our results had to use retry logic.
So ideally, this scraper could definitely run much faster if it doesn't need to perform retries.
Legal and Ethical Considerations
Every time you interact with a website, you are subject to their Terms of Service. Violating these terms can get you suspended, or even worse, permanently banned from the site.
Yelp's terms are available to read here. When using any sort of bot such as a scraper, you also need to take a look at their robots.txt
here.
Scraping public information is generally considered legal. Public information on the web is any information that is not gated behind a login.
If you need to login to view the data, this is considered private data. If you have questions about the legality of a scraping job, you should consult an attorney.
Conclusion
You've made it to the end! You now know how to extract data in multiple ways, pulling it directly from the HTML on the page, as well as how to access it from embedded JSON. You have a decent understanding of parsing, pagination, data storage, concurrency, and proxy integration.
To learn more about the tech stack used in this article, take a look at the links below.
More Selenium Web Scraping Guides
Here at ScrapeOps, we've got a ton of resources for you to learn from. You are never done learning. IF you're interested in scraping other interesting sites, we've got guides for that as well.
Check out our Selenium Web Scraping Playbook.
Level up your scraping skills with one of the articles below!