Skip to main content

Scrape Yelp With Python Requests and BeautifulSoup

How to Scrape Yelp With Requests and BeautifulSoup

Yelp is perhaps the most well known review site in the world. Since it became popular, Yelp ratings have been crucial to businesses and owners will often go to great lengths to remove bad reviews on Yelp. Yelp is a great place to quickly gather data about a business.

In this guide, we're going to go over how to scrape Yelp with Requests and BeautifulSoup.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape Yelp

When we crawl Yelp search results, we pull deeply nested objects out of the page. When we scrape business pages Yelp, we can actually get quite a bit of important imformation from JSON blobs nested in the page.

The code below gives you a Yelp restaurant scraper, integrated with the ScrapeOps Residential Proxy and ready to go.

All you need to do is create a config.json file with your api key and place it in the same folder as this script.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class RestaurantData:
name: str = ""
family_friendly: bool = False
date: str = ""
position: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")

for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None

img = div_card.find("img")
title = img.get("alt")

if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])

has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0

if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)

review_count = 0

if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]

a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"

search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

info_section = json.loads(soup.select_one("script[type='application/ld+json']").text)
list_elements = info_section["itemListElement"]

unknown_count = 1
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{unknown_count}"
unknown_count += 1

family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]

restaurant_data = RestaurantData(
name=name,
family_friendly=family_friendly,
date=date,
position=position
)
review_pipeline.add_data(restaurant_data)

review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Feel free to change any of the following constants:

  • MAX_RETRIES: Defines the maximum number of times the script will attempt to retry an operation (such as scraping data) in case of failure.
  • MAX_THREADS: Sets the maximum number of threads that can run concurrently. It controls how many threads (i.e., parallel tasks) can be used for scraping or processing data.
  • PAGES: Defines how many pages of search results should be scraped for each keyword.
  • LOCATION: Specifies the location or country for the search query, which is used in the search URL.

Use caution when changing the keywords_list. Yelp uses different CSS and layouts for different types of businesses.


How To How To Architect Our Yelp Scraper

Scraping Yelp will actually require two scrapers. First we need to build a crawler. The purpose of the crawler is relatively straightforward.

The crawler needs to perform the following tasks:

  1. Perform a search and parse the results. When parsing the results, we extract the following variables:
    • name: the name of the business.
    • sponsored: a boolean variable. If the post is an ad, sponsored is True.
    • stars: how many stars the business has based on overall reviews.
    • rank: where the business shows up in our search results.
    • review_count: is the amount of reviews the business has.
    • url: is the url to the Yelp page for the business.
  2. We should be able to paginate our search in order to control our results.
  3. Once we've got control of our batches, we need to store the data we've extracted.
  4. Perform steps 1 through 3 with concurrency, so we can scrape multiple pages of data simultaneously.
  5. Integrate with the ScrapeOps Proxy API in order to get past any roadblocks the site may have in place.

Our scraper will perform these tasks:

  1. Load urls to scrape
  2. Parse the Yelp page for each url, getting the following variables for each review:
    • name: the name of the reviewer.
    • family_friendly: whether or not they consider the business to be family friendly.
    • date: the date that the review was uploaded.
    • position: the position of the review on the page. For instance, the top review would have the position of 1.
  3. Store the extracted data.
  4. Perform tasks 1 through 3 concurrently.
  5. Integrate with the ScrapeOps Proxy API.

Understanding How To Scrape Yelp

Before we write our scraping code, we need to understand exactly how to get our information and how to extract it from the page. We'll use the ScrapeOps Proxy Aggregator API to handle our geolocation.

We'll go through these next few steps in order to plan out how to build our scraper.


Step 1: How To Request Yelp Pages

When we search businesses on Yelp, we get a url that looks like this:

https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}

So if we wanted to search for restaurants in the US, our URL would look like this:

https://www.yelp.com/search?find_desc=restaurants&find_loc=us

Take a look at the url in the search results page below:

Yelp Search Results

Below is an image of an individual business page on Yelp. As you can see, the name of the business comes after /biz/. We don't have to worry too much about how these urls are constructed because we'll be extracting them straight from the search results.

Yelp Business Pages


Step 2: How To Extract Data From Yelp Results and Pages

In our search results, each business gets its own card on the page. Each of these cards has a data-testid of serp-ia-card. You can take a look below. By identifying these cards, we can go through and extract our needed information from each card.

Yelp HTML Inspection Search Results

On the individual business page, we're actually going to be pulling our data from a JSON blob embedded in the page.

YELP HTML Inspection Reviews


Step 3: How To Control Pagination

As you may have noticed when inspecting the url in the picture earlier, we actually have a start param inside of it as well.

If we want to start at 0 (this would be page 0) and get results up to 10, we would pass start=0. For page 2, we'd pass start=10.

This process repeats all the way down the line. To fetch our batch, we'll call our page number times 10.


Step 4: Geolocated Data

To handle geolocated data we'll be using both the ScrapeOps API and the find_loc param. We could separate these into two unique variables, but for the purpose of this tutorial, we're going to use the same place.

For instance, if we're using a US based proxy server, we'll pass us in for our search location as well.


Setting Up Our Yelp Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir yelp-scraper

cd yelp-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A Yelp Search Crawler

We'll get started building our crawler. Our crawler needs to incorporate parsing, pagination, data storage, concurrency and proxy integration into the design.

In the sections below, we'll go through this step by step.


Step 1: Create Simple Search Data Parser

To start, we'll create a parsing function. The goal of this function is simple:

  1. Perform a search
  2. Extract data from the page

Take a look at the example below. Aside from some logging and basic retry logic, all it really does parse a search page.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")

for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None

img = div_card.find("img")
title = img.get("alt")

if not sponsored:
rank_string = card_text.replace(title, "").split(".")
ranking = int(rank_string[0])

has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0

if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)

review_count = 0

if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]

a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"

search_data = {
"name": title,
"sponsored": sponsored,
"stars": rating,
"rank": rank,
"review_count": review_count,
"url": yelp_url
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)


logger.info(f"Crawl complete.")

In the code above, we:

  • Find all of the business cards using their CSS Selector: "div[data-testid='serp-ia-card']"
  • We use the alt text from the image to pull the title of the business
  • Decide whether the card is sponsored or an actual search result. We use sponsored = card_text[0].isdigit() == False because all ranked results have a number before their name.
  • If the card is not sponsored, we use rank_string = card_text.replace(title, "").split(".") to split up the ranks. For instance, if the card title is 1. My Cool Business, we would split at . and our rank would be 1.
  • We then find out if the business has a rating. If so, we pull it from the card with some more string splitting.
  • We also use string splitting to get the review count as well.
  • Finally, we find the a_element and extract its href to get the link to the Yelp page of the business.

Step 2: Add Pagination

Time to add pagination. We use pagination to control our batches of results.

To add pagination, we're going to add a page_number argument and change one part of our parsing function, the URL:

    url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"

We'll also add a start_scrape() function which allows us to scrape multiple pages.

def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)

Now, let's put it all together:

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")

for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None

img = div_card.find("img")
title = img.get("alt")

if not sponsored:
rank_string = card_text.replace(title, "").split(".")
ranking = int(rank_string[0])

has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0

if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)

review_count = 0

if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]

a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"

search_data = {
"name": title,
"sponsored": sponsored,
"stars": rating,
"rank": rank,
"review_count": review_count,
"url": yelp_url
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

Step 3: Storing the Scraped Data

Scraping would be pointless if we didn't store the data.

In this section, we're going to add some functionality for basic data storage.

We'll start by adding a SearchData class. The purpose of this one is to simply hold our data while it's waiting to be stored.

Then, we'll add a DataPipeline class. The DataPipeline is extremely important. This class takes all of our data and pipes it to a CSV file. It also filters out our duplicates.

Here is the SearchData class.

@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is our DataPipeline.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

Our fully updated code should look like this:

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")

for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None

img = div_card.find("img")
title = img.get("alt")

if not sponsored:
rank_string = card_text.replace(title, "").split(".")
ranking = int(rank_string[0])

has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0

if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)

review_count = 0

if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]

a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"

search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Key points to notice here:

  • Instead of putting our data into a dict, we use it to create a SearchData object.
  • Instead of printing our data to the terminal, we pass our search_data into the data_pipeline, which stores our data inside of a CSV file.

Step 4: Adding Concurrency

Concurrency can work wonders for any sort of repetitive task. Scraping a bunch of pages is incredibly repetitive. Let's use ThreadPoolExecutor to scrape multiple pages simultaneously on separate threads.

To do this, we only need to change one function, start_scrape().

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

Instead of using a for loop, we pass scrape_search_results into executor.map() along with all of our arguments. Notice that we pass our arguments in as arrays this time. Here is our fully updated crawler.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")

for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None

img = div_card.find("img")
title = img.get("alt")

if not sponsored:
rank_string = card_text.replace(title, "").split(".")
ranking = int(rank_string[0])

has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0

if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)

review_count = 0

if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]

a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"

search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 5: Bypassing Anti-Bots

Anti-bots are one of the most difficult things to deal with when scraping. While they're designed to protect from malware, they tend to block scrapers.

The reason they block scrapers: scrapers do not appear to be human at all and this makes them look suspicious.

To get past any roadblocks in our way, we'll make a simple function that unleashes the power of proxy integration.

The function below converts any URL into a ScrapeOps proxied URL.

  • The wait parameter also tells the ScrapeOps server to wait for two seconds before sending the page back to us.
  • This allows the page content to load and it also makes our traffic look a little less suspicious because we're not requesting 500 pages at the same time.
  • You don't need to print the proxy_url, but when you're debugging, it comes in really handy.

If your scraper fails on a page, you can simply open up the proxied page in your browser to see what went wrong!

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url

Here is our fully update and production ready crawler.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")

for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None

img = div_card.find("img")
title = img.get("alt")

if not sponsored:
rank_string = card_text.replace(title, "").split(".")
ranking = int(rank_string[0])

has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0

if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)

review_count = 0

if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]

a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"

search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Time to test out our crawler in production. Here is our updated main. Feel free to change any of the constants here that you'd like. I changed MAX_THREADS to 4.

Exercise caution when changing your keyword_list. Yelp uses slightly different layouts and CSS for different types of businesses.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Here are the results:

Yelp Crawler Performance

The operation completed in 98 seconds, which comes out to roughly 20 seconds per page. Most of the time spent was waiting for the ScrapeOps server to send results back. This is actually not a bad thing.

When you fetch a website with ScrapeOps, ScrapeOps makes sure that you get a proper response, so if it fails the first time, they keep trying until they get it.

Just for the sake of testing it out, let's run it with the residential proxy, simply add "residential": True to get_scrapeops_url().

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
"wait": 2000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url

ScrapeOps Residential Proxy Performance

The residential proxy cut our response time to 33 seconds!!! When using the residential proxy, our crawler runs 3 times as fast!!!


Build A Yelp Scraper

We've built a successful crawler. Now, we need to build our scraper. Our scraper needs be able to do the following:

  • Parse a business page
  • Load urls from the CSV file generated by our crawler
  • Store data extracted by the parser
  • Parse multiple pages concurrently
  • Bypass anti-bots with a proxy

While we build our scraper, we're going to utilize parsing, data storage, concurrency, and proxy integration.


Step 1: Create Simple Business Data Parser

Once again, we'll start with a parser. Take a look at the function below. It pulls up an individual business page and then fetches data from its reviews.

def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

info_section = json.loads(soup.select_one("script[type='application/ld+json']").text)
list_elements = info_section["itemListElement"]

unknown_count = 1
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{unknown_count}"
unknown_count += 1

family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]

restaurant_data = {
"name": name,
"family_friendly": family_friendly,
"date": date,
"position": position
}
print(restaurant_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")

This function is very similar to the parsing function we used in the crawler. The key differences lie in where we pull the data.

  • We find our JSON blob with soup.select_one("script[type='application/ld+json']").text
  • We then pull name, family_friendly, date, and position from the JSON.

Step 2: Loading URLs To Scrape

Without a way to read our CSV file, there's really no use for our parsing function. We need to load these urls and then parse them.

Here we'll make another function, process_results().

At the moment, it uses a for loop, but we'll add concurrency shortly.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)

Here is our full code:

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")

for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None

img = div_card.find("img")
title = img.get("alt")

if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])

has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0

if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)

review_count = 0

if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]

a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"

search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

info_section = json.loads(soup.select_one("script[type='application/ld+json']").text)
list_elements = info_section["itemListElement"]

unknown_count = 1
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{unknown_count}"
unknown_count += 1

family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]

restaurant_data = {
"name": name,
"family_friendly": family_friendly,
"date": date,
"position": position
}
print(restaurant_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 3: Storing the Scraped Data

As mentioned before, scraping is a pointless endeavor if we're not storing the data. we already have a DataPipeline, we just need to create another dataclass, RestaurantData.

Here is our new class:

@dataclass
class RestaurantData:
name: str = ""
family_friendly: bool = False
date: str = ""
position: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Now, we'll update our code to use this class and pass it into a DataPipeline.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class RestaurantData:
name: str = ""
family_friendly: bool = False
date: str = ""
position: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")

for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None

img = div_card.find("img")
title = img.get("alt")

if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])

has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0

if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)

review_count = 0

if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]

a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"

search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

info_section = json.loads(soup.select_one("script[type='application/ld+json']").text)
list_elements = info_section["itemListElement"]

unknown_count = 1
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{unknown_count}"
unknown_count += 1

family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]

restaurant_data = RestaurantData(
name=name,
family_friendly=family_friendly,
date=date,
position=position
)
review_pipeline.add_data(restaurant_data)

review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 4: Adding Concurrency

Now to add concurrency. Once again we'll be using executor.map(). As before, our first argument is our parsing function.

All subsequent arguments are arrays that get passed into our parsing function.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Step 5: Bypassing Anti-Bots

Finally, time to add proxy support. Since the residential proxy made such a difference, we'll be using it here.

Once again, here's the proxy function adjusted for residential.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url

Here is our production ready code:

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
sponsored: bool = False
stars: float = 0
rank: int = 0
review_count: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class RestaurantData:
name: str = ""
family_friendly: bool = False
date: str = ""
position: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
#possibly need to urlencode location: city + state + zip code
url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='serp-ia-card']")

for div_card in div_cards:
card_text = div_card.text
sponsored = card_text[0].isdigit() == False
ranking = None

img = div_card.find("img")
title = img.get("alt")

if not sponsored:
rank_string = card_text.replace(title, "").split(".")
if len(rank_string) > 0:
ranking = int(rank_string[0])

has_rating = div_card.select_one("div span[data-font-weight='semibold']")
rating = 0.0

if len(has_rating.text) > 0:
if has_rating.text[0].isdigit():
rating = float(has_rating.text)

review_count = 0

if "review" in card_text:
review_count = card_text.split("(")[1].split(")")[0].split(" ")[0]

a_element = div_card.find("a")
link = a_element.get("href")
yelp_url = f"https://www.yelp.com{link}"

search_data = SearchData(
name=title,
sponsored=sponsored,
stars=rating,
rank=ranking,
review_count=review_count,
url=yelp_url
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

info_section = json.loads(soup.select_one("script[type='application/ld+json']").text)
list_elements = info_section["itemListElement"]

unknown_count = 1
for element in list_elements:
name = element["author"]["name"]
if name == "Unknown User":
name = f"{name}{unknown_count}"
unknown_count += 1

family_friendly = element["isFamilyFriendly"]
date = element.get("uploadDate")
position = element["position"]

restaurant_data = RestaurantData(
name=name,
family_friendly=family_friendly,
date=date,
position=position
)
review_pipeline.add_data(restaurant_data)

review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Time to make run the entire thing and see how it goes. Here is our updated main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 4
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurants"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Here are the results:

Scraper Performance

In total, the job took approximately 6 minutes 14 seconds, or 374 seconds. Minus the 33 seconds that the crawler took earlier, this leaves us with roughly 341 seconds parsing businesses. Our CSV generated by the crawler had 58 businesses.

341 seconds / 58 businesses ~ 5.88 seconds per page. This is very consistent with the results from the crawler (roughly 6 seconds per page).


Whenever you scrape a website, you need to respect their policies. More specifically, we need to pay attention to both their Terms of Service and robots.txt. You can view Yelp's terms here. Their robots.txt is available here.

Remember, if you violate a site's policies, you can get suspended or even permanently banned from using their services.

Public data is usually fair game for scraping. When information is not gated behind a login or some other type of authentication, this information is considered public. If you need to login to access the information, this is considered to be private data.

If you have concerns about the legality of your scraper, consult an attorney.


Conclusion

You did it! You now know how to scrape Yelp. You have a solid understanding of parsing, pagination, data storage, concurrency, and proxy integration.

You also know how to use BeautifulSoup to extract page elements and you can easily parse through JSON blobs on a page.


More Python Web Scraping Guides

Here at ScrapeOps, we've got a ton of resources for you to learn from. You are never done learning. IF you're interested in scraping other interesting sites, we've got guides for that as well.

Check out our Python Web Scraping Playbook.

Level up your scraping skills with one of the articles below!