How to Scrape Yelp With Requests and BeautifulSoup
Yelp is perhaps the most well known review site in the world. Since it became popular, Yelp ratings have been crucial to businesses and owners will often go to great lengths to remove bad reviews on Yelp. Yelp is a great place to quickly gather data about a business.
In this guide, we're going to go over how to scrape Yelp with Requests and BeautifulSoup.
- TLDR: How to Scrape Yelp
- How To Architect Our Scraper
- Understanding How To Scrape yelp
- Setting Up Our Yelp Scraper
- Build A Yelp Search Crawler
- Build A Yelp Scraper
- Legal and Ethical Considerations
- Conclusion
- More Python Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Yelp
When we crawl Yelp search results, we pull deeply nested objects out of the page. When we scrape business pages Yelp, we can actually get quite a bit of important imformation from JSON blobs nested in the page.
The code below gives you a Yelp restaurant scraper, integrated with the ScrapeOps Residential Proxy and ready to go.
All you need to do is create a config.json
file with your api key and place it in the same folder as this script.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class RestaurantData:
name: str = ""
family_friendly: bool = False
date: str = ""
position: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find("img")
title = img.get("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0
if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]
a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
info_section = json.loads(soup.select_one("script[type='application/ld+json']").text)
list_elements = info_section["itemListElement"]
unknown_count = 1
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{unknown_count}"
unknown_count += 1
family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]
restaurant_data = RestaurantData(
name=name,
family_friendly=family_friendly,
date=date,
position=position
)
review_pipeline.add_data(restaurant_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Feel free to change any of the following constants:
MAX_RETRIES
: Defines the maximum number of times the script will attempt to retry an operation (such as scraping data) in case of failure.MAX_THREADS
: Sets the maximum number of threads that can run concurrently. It controls how many threads (i.e., parallel tasks) can be used for scraping or processing data.PAGES
: Defines how many pages of search results should be scraped for each keyword.LOCATION
: Specifies the location or country for the search query, which is used in the search URL.
Use caution when changing the keywords_list
. Yelp uses different CSS and layouts for different types of businesses.
How To How To Architect Our Yelp Scraper
Scraping Yelp will actually require two scrapers. First we need to build a crawler. The purpose of the crawler is relatively straightforward.
The crawler needs to perform the following tasks:
- Perform a search and parse the results. When parsing the results, we extract the following variables:
name
: the name of the business.sponsored
: a boolean variable. If the post is an ad,sponsored
isTrue
.stars
: how many stars the business has based on overall reviews.rank
: where the business shows up in our search results.review_count
: is the amount of reviews the business has.url
: is the url to the Yelp page for the business.
- We should be able to paginate our search in order to control our results.
- Once we've got control of our batches, we need to store the data we've extracted.
- Perform steps 1 through 3 with concurrency, so we can scrape multiple pages of data simultaneously.
- Integrate with the ScrapeOps Proxy API in order to get past any roadblocks the site may have in place.
Our scraper will perform these tasks:
- Load urls to scrape
- Parse the Yelp page for each url, getting the following variables for each review:
name
: the name of the reviewer.family_friendly
: whether or not they consider the business to be family friendly.date
: the date that the review was uploaded.position
: the position of the review on the page. For instance, the top review would have theposition
of 1.
- Store the extracted data.
- Perform tasks 1 through 3 concurrently.
- Integrate with the ScrapeOps Proxy API.
Understanding How To Scrape Yelp
Before we write our scraping code, we need to understand exactly how to get our information and how to extract it from the page. We'll use the ScrapeOps Proxy Aggregator API to handle our geolocation.
We'll go through these next few steps in order to plan out how to build our scraper.
Step 1: How To Request Yelp Pages
When we search businesses on Yelp, we get a url that looks like this:
https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}
So if we wanted to search for restaurants in the US, our URL would look like this:
https://www.yelp.com/search?find_desc=restaurants&find_loc=us
Take a look at the url in the search results page below:
Below is an image of an individual business page on Yelp. As you can see, the name of the business comes after /biz/
. We don't have to worry too much about how these urls are constructed because we'll be extracting them straight from the search results.
Step 2: How To Extract Data From Yelp Results and Pages
In our search results, each business gets its own card on the page. Each of these cards has a data-testid
of serp-ia-card
. You can take a look below. By identifying these cards, we can go through and extract our needed information from each card.
On the individual business page, we're actually going to be pulling our data from a JSON blob embedded in the page.
Step 3: How To Control Pagination
As you may have noticed when inspecting the url in the picture earlier, we actually have a start
param inside of it as well.
If we want to start
at 0 (this would be page 0) and get results up to 10, we would pass start=0
. For page 2, we'd pass start=10
.
This process repeats all the way down the line. To fetch our batch, we'll call our page number times 10.
Step 4: Geolocated Data
To handle geolocated data we'll be using both the ScrapeOps API and the find_loc
param. We could separate these into two unique variables, but for the purpose of this tutorial, we're going to use the same place.
For instance, if we're using a US based proxy server, we'll pass us
in for our search location as well.
Setting Up Our Yelp Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir yelp-scraper
cd yelp-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A Yelp Search Crawler
We'll get started building our crawler. Our crawler needs to incorporate parsing, pagination, data storage, concurrency and proxy integration into the design.
In the sections below, we'll go through this step by step.
Step 1: Create Simple Search Data Parser
To start, we'll create a parsing function. The goal of this function is simple:
- Perform a search
- Extract data from the page
Take a look at the example below. Aside from some logging and basic retry logic, all it really does parse a search page.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find("img")
title = img.get("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
ranking = int(rank_string[0])
has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0
if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]
a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"
search_data = {
"name": title,
"sponsored": sponsored,
"stars": rating,
"rank": rank,
"review_count": review_count,
"url": yelp_url
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
In the code above, we:
- Find all of the business cards using their CSS Selector:
"div[data-testid='serp-ia-card']"
- We use the
alt text
from the image to pull thetitle
of the business - Decide whether the card is
sponsored
or an actual search result. We usesponsored = card_text[0].isdigit() == False
because all ranked results have a number before their name. - If the card is not
sponsored
, we userank_string = card_text.replace(title, "").split(".")
to split up the ranks. For instance, if the card title is1. My Cool Business
, we would split at.
and our rank would be 1. - We then find out if the business has a rating. If so, we pull it from the card with some more string splitting.
- We also use string splitting to get the review count as well.
- Finally, we find the
a_element
and extract itshref
to get the link to the Yelp page of the business.
Step 2: Add Pagination
Time to add pagination. We use pagination to control our batches of results.
To add pagination, we're going to add a page_number
argument and change one part of our parsing function, the URL:
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
We'll also add a start_scrape()
function which allows us to scrape multiple pages.
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
Now, let's put it all together:
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find("img")
title = img.get("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
ranking = int(rank_string[0])
has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0
if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]
a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"
search_data = {
"name": title,
"sponsored": sponsored,
"stars": rating,
"rank": rank,
"review_count": review_count,
"url": yelp_url
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
Step 3: Storing the Scraped Data
Scraping would be pointless if we didn't store the data.
In this section, we're going to add some functionality for basic data storage.
We'll start by adding a SearchData
class. The purpose of this one is to simply hold our data while it's waiting to be stored.
Then, we'll add a DataPipeline
class. The DataPipeline
is extremely important. This class takes all of our data and pipes it to a CSV file. It also filters out our duplicates.
Here is the SearchData
class.
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is our DataPipeline
.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
Our fully updated code should look like this:
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find("img")
title = img.get("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
ranking = int(rank_string[0])
has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0
if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]
a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Key points to notice here:
- Instead of putting our data into a
dict
, we use it to create aSearchData
object. - Instead of printing our data to the terminal, we pass our
search_data
into thedata_pipeline
, which stores our data inside of a CSV file.
Step 4: Adding Concurrency
Concurrency can work wonders for any sort of repetitive task. Scraping a bunch of pages is incredibly repetitive. Let's use ThreadPoolExecutor
to scrape multiple pages simultaneously on separate threads.
To do this, we only need to change one function, start_scrape()
.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
Instead of using a for
loop, we pass scrape_search_results
into executor.map()
along with all of our arguments. Notice that we pass our arguments in as arrays this time. Here is our fully updated crawler.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find("img")
title = img.get("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
ranking = int(rank_string[0])
has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0
if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]
a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 5: Bypassing Anti-Bots
Anti-bots are one of the most difficult things to deal with when scraping. While they're designed to protect from malware, they tend to block scrapers.
The reason they block scrapers: scrapers do not appear to be human at all and this makes them look suspicious.
To get past any roadblocks in our way, we'll make a simple function that unleashes the power of proxy integration.
The function below converts any URL into a ScrapeOps proxied URL.
- The
wait
parameter also tells the ScrapeOps server to wait for two seconds before sending the page back to us. - This allows the page content to load and it also makes our traffic look a little less suspicious because we're not requesting 500 pages at the same time.
- You don't need to print the
proxy_url
, but when you're debugging, it comes in really handy.
If your scraper fails on a page, you can simply open up the proxied page in your browser to see what went wrong!
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
Here is our fully update and production ready crawler.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find("img")
title = img.get("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
ranking = int(rank_string[0])
has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0
if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]
a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Time to test out our crawler in production. Here is our updated main
. Feel free to change any of the constants here that you'd like. I changed MAX_THREADS
to 4.
Exercise caution when changing your keyword_list
. Yelp uses slightly different layouts and CSS for different types of businesses.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Here are the results:
The operation completed in 98 seconds, which comes out to roughly 20 seconds per page. Most of the time spent was waiting for the ScrapeOps server to send results back. This is actually not a bad thing.
When you fetch a website with ScrapeOps, ScrapeOps makes sure that you get a proper response, so if it fails the first time, they keep trying until they get it.
Just for the sake of testing it out, let's run it with the residential proxy, simply add "residential": True
to get_scrapeops_url()
.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
"wait": 2000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
The residential proxy cut our response time to 33 seconds!!! When using the residential proxy, our crawler runs 3 times as fast!!!
Build A Yelp Scraper
We've built a successful crawler. Now, we need to build our scraper. Our scraper needs be able to do the following:
- Parse a business page
- Load urls from the CSV file generated by our crawler
- Store data extracted by the parser
- Parse multiple pages concurrently
- Bypass anti-bots with a proxy
While we build our scraper, we're going to utilize parsing, data storage, concurrency, and proxy integration.
Step 1: Create Simple Business Data Parser
Once again, we'll start with a parser. Take a look at the function below. It pulls up an individual business page and then fetches data from its reviews.
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
info_section = json.loads(soup.select_one("script[type='application/ld+json']").text)
list_elements = info_section["itemListElement"]
unknown_count = 1
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{unknown_count}"
unknown_count += 1
family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]
restaurant_data = {
"name": name,
"family_friendly": family_friendly,
"date": date,
"position": position
}
print(restaurant_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
This function is very similar to the parsing function we used in the crawler. The key differences lie in where we pull the data.
- We find our JSON blob with
soup.select_one("script[type='application/ld+json']").text
- We then pull
name
,family_friendly
,date
, andposition
from the JSON.
Step 2: Loading URLs To Scrape
Without a way to read our CSV file, there's really no use for our parsing function. We need to load these urls and then parse them.
Here we'll make another function, process_results()
.
At the moment, it uses a for
loop, but we'll add concurrency shortly.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
Here is our full code:
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find("img")
title = img.get("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0
if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]
a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
info_section = json.loads(soup.select_one("script[type='application/ld+json']").text)
list_elements = info_section["itemListElement"]
unknown_count = 1
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{unknown_count}"
unknown_count += 1
family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]
restaurant_data = {
"name": name,
"family_friendly": family_friendly,
"date": date,
"position": position
}
print(restaurant_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
As mentioned before, scraping is a pointless endeavor if we're not storing the data. we already have a DataPipeline
, we just need to create another dataclass
, RestaurantData
.
Here is our new class:
@dataclass
class RestaurantData:
name: str = ""
family_friendly: bool = False
date: str = ""
position: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Now, we'll update our code to use this class and pass it into a DataPipeline
.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class RestaurantData:
name: str = ""
family_friendly: bool = False
date: str = ""
position: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find("img")
title = img.get("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0
if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]
a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
info_section = json.loads(soup.select_one("script[type='application/ld+json']").text)
list_elements = info_section["itemListElement"]
unknown_count = 1
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{unknown_count}"
unknown_count += 1
family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]
restaurant_data = RestaurantData(
name=name,
family_friendly=family_friendly,
date=date,
position=position
)
review_pipeline.add_data(restaurant_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 4: Adding Concurrency
Now to add concurrency. Once again we'll be using executor.map()
. As before, our first argument is our parsing function.
All subsequent arguments are arrays that get passed into our parsing function.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Step 5: Bypassing Anti-Bots
Finally, time to add proxy support. Since the residential proxy made such a difference, we'll be using it here.
Once again, here's the proxy function adjusted for residential.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
Here is our production ready code:
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class RestaurantData:
name: str = ""
family_friendly: bool = False
date: str = ""
position: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")
for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None
img = div_card.find("img")
title = img.get("alt")
if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])
has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0
if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)
review_count = 0
if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]
a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"
search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
info_section = json.loads(soup.select_one("script[type='application/ld+json']").text)
list_elements = info_section["itemListElement"]
unknown_count = 1
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{unknown_count}"
unknown_count += 1
family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]
restaurant_data = RestaurantData(
name=name,
family_friendly=family_friendly,
date=date,
position=position
)
review_pipeline.add_data(restaurant_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Time to make run the entire thing and see how it goes. Here is our updated main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Here are the results:
In total, the job took approximately 6 minutes 14 seconds, or 374 seconds. Minus the 33 seconds that the crawler took earlier, this leaves us with roughly 341 seconds parsing businesses. Our CSV generated by the crawler had 58 businesses.
341 seconds / 58 businesses ~ 5.88 seconds per page. This is very consistent with the results from the crawler (roughly 6 seconds per page).
Legal and Ethical Considerations
Whenever you scrape a website, you need to respect their policies. More specifically, we need to pay attention to both their Terms of Service and robots.txt
. You can view Yelp's terms here. Their robots.txt
is available here.
Remember, if you violate a site's policies, you can get suspended or even permanently banned from using their services.
Public data is usually fair game for scraping. When information is not gated behind a login or some other type of authentication, this information is considered public. If you need to login to access the information, this is considered to be private data.
If you have concerns about the legality of your scraper, consult an attorney.
Conclusion
You did it! You now know how to scrape Yelp. You have a solid understanding of parsing, pagination, data storage, concurrency, and proxy integration.
You also know how to use BeautifulSoup to extract page elements and you can easily parse through JSON blobs on a page.
More Python Web Scraping Guides
Here at ScrapeOps, we've got a ton of resources for you to learn from. You are never done learning. IF you're interested in scraping other interesting sites, we've got guides for that as well.
Check out our Python Web Scraping Playbook.
Level up your scraping skills with one of the articles below!