Skip to main content

Scrape Google Reviews With Python Requests and BeautifulSoup

How to Scrape Google Reviews With Requests and BeautifulSoup

Scraping Google Reviews is notoriously difficult. To start, Google Reviews are only accessible via Google Maps. On top of that, Google uses dynamic CSS selectors, the data gets loaded dynamically, and all of it is incredibly nested.

Today, we'll learn how to crawl Google Maps and then we'll learn how to get the reviews for each business found in our crawl. This information is incredibly useful, especially when you wish to collect aggregate data on different businesses.


TLDR - How to Scrape Google Reviews

To scrape Google Reviews, we need to crawl Google Maps and create a list of businesses. Then, we need to lookup each business from the list and save the reviews. The code below does exactly this. To get started:

  1. Create a new project folder with a config.json file.
  2. Then add your ScrapeOps API key to the file, {"api_key": "your-super-secret-api-key"}.
  3. Copy/paste the code below into a new Python file and you're good to go!
  4. Run it with python name_of_your_python_file.py.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
stars: int = 0
time_left: str = ""
review_shortened: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)

success = True
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)


def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[role='main']")

info_cards = soup.find_all("div", class_="MyEned")
review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
for card in info_cards:
review = card.text

full_card = card.parent.parent.parent.parent
reviewer_button = full_card.find("button")
name = reviewer_button.get("aria-label").replace("Photo of ", "")
rating_tag = full_card.select_one("span[role='img']")
stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", ""))
review_date = rating_tag.parent.find_all("span")[-1].text

review_data = ReviewData(
name=name,
stars=stars,
time_left=review_date,
review_shortened=review
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

To customize your results, change any of the following:

  • MAX_RETRIES: the max amount of retries for a parse.
  • MAX_THREADS: how many threads you'd like to use when parsing pages simultaneously.
  • LOCATION: the location you'd like to appear from.
  • LOCALITIES: the areas of the map you'd like to scrape. They need to be added in as latitude and longitude pairs.
  • keyword_list: the keywords you'd like to search the map for.

When you change your localities, you need to use latitude and longitude pairs.


How To Architect Our Google Reviews Scraper

As mentioned above, to get Google Reviews, we need to crawl Google Maps. Maps itself is really tricky to scrape. We need a locality (latitude and longitude), and we need to wait for dynamic content to load on the screen before getting our result.

If our proxy server properly loads our page, we can extract the following information for each business in the search: name, stars, url, and rating_count. Then, we'll save these to a CSV file. Then, our review scraper will go through and find reviews for each of these businesses.

Our Maps crawler will need to do the following:

  • Lookup businesses in a certain locality and parse the results.
  • Store the parsed data inside a CSV file.
  • Concurrently parse multiple localities at once.
  • Integrate with a proxy to get past Google's anti-bot systems

After our crawl, the Reviews scraper needs to perform these tasks:

  • Read the CSV from the crawl into an array.
  • Parse reviews from each business extracted during the crawl.
  • Store review data for each business.
  • Concurrently parse and store this data.
  • Use proxy integration to get past anything that might block us.

Understanding How To Scrape Google Reviews

When we scrape Google Reviews, our data gets loaded dynamically. On top of that, it is incredibly nested within the page. Let's get a better understanding of how exactly to get the pages that contain our data. Then, we'll take a look at where we need to pull the data from.


Step 1: How To Request Google Reviews Pages

As with any scraping job, we need to begin with a GET request. If you're unfamiliar with HTTP, we perform a GET to get information.

  • When you navigate to a site in your browser, your browser performs what's called a GET request to the server.
  • Your browser receives a response back in the form of an HTML page.
  • With Python Requests (our HTTP client), we'll perform that same GET request.
  • The big difference is how we handle the HTML response.
  • Instead of rendering the page for us to view (like the browser does), we'll code our scraper to actually dig through the HTML for the information.

If you look below, you can veiw an example search for the word "restaurant". Here is our URL:

https://www.google.com/maps/place/Leo's+Coney+Island/@42.3937072,-83.4828338,17z/data=!4m6!3m5!1s0x8824acedc1b6f397:0xaa85d06de541a352!8m2!3d42.3937072!4d-83.4828338!16s%2Fg%2F1tf299fd?authuser=0&hl=en&entry=ttu&g_ep=EgoyMDI0MDkwOC4wIKXMDSoASAFQAw%3D%3D`

@42.3937072,-83.4828338 is our latitude and longitude.

Google Maps Search Results

When we lookup a specific restaurant, we get a super similar page. We get our map, and along with it, we get a section of the page containing the business information and reviews.

Reviews


Step 2: How To Extract Data From Google Reviews Results and Pages

As you just learned, we start with a GET request. The next question, what do we do with the page once we've gotten it? We need to dig through the HTML and pull the data out of the HTML page. Let's take a look at the pages we just visited and see where the data is located inside the HTML.

On the search page, each restaurant has an a tag with a link to the restaurant information.

Search page HTML Inspection

On the individual business page, the actual reviews are embedded within a div with a class of MyEned. Once we find this element, we can find its parent elements. Once we've found the correct parent element, we can find all of the other information we need.

Business HTML Inspection


Step 3: Geolocated Data

There are two things we need to do in order to handle geolocation.

  1. To start, when we search businesses on Google Maps, we're searching based on a specific locality using its latitude and longitude. When you're interacting with Google Maps, you're not paying attention to that part, but these coordinates are actually saved in our URL. Think back to the latitude and longitude in the url from earlier, @42.3937072,-83.4828338.

  2. On top of the locality we wish to search, we need to handle the actual location we want to appear in on Google's servers. To take care of this, we can use the country param with the ScrapeOps Proxy Aggregator.

  • If you want to appear in the US, you can pass {"country": "us"} to ScrapeOps.

You can view a full list of supported countries here.


Setting Up Our Google Reviews Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir google-reviews-scraper

cd google-reviews-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A Google Reviews Search Crawler

To get started, we need to get a list of businesses and their urls. To accomplish this, we need to build a crawler that performs a search and saves a list of businesses. We're going to go through several iterations and build our crawler in the following steps:

  1. Perform a search and parse the results.
  2. Store those results safely in a CSV file.
  3. Run steps 1 and 2 on multiple localities with concurrency.
  4. Use proxy integration to help control our geolocation and bypass anti-bots.

Step 1: Create Simple Search Data Parser

We need to start by creating a simple search parser. In the code example below, we setup our basic structure. This code contains error handling, retry logic, and our parsing function, scrape_search_results().

Pay close attention to the parsing logic going on in this script.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, locality, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = {
"name": name,
"stars": rating,
"url": maps_link,
"rating_count": rating_count
}

print(search_data)

success = True
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, retries=3):
for locality in localities:
scrape_search_results(keyword, location, locality, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

start_scrape(keyword, LOCATION, LOCALITIES, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")
  • First, we find all of the business links, business_links = soup.select("div div a").
  • We filter out all of our unwanted links.
  • We retrieve the name of each business with business_link.get("aria-label").
  • business_link.get("href") gives us the link to each business.
  • We then find the parent element of the business link, full_card = business_link.parent.
  • full_card.select_one("span[role='img']") finds our rating holder.
  • We use basic string splitting to extract the rating and then we convert it to an integer.

Step 2: Storing the Scraped Data

Now, to store our data. Without data storage, our crawl would be pretty useless. Our goal is to store all the data extracted from the crawl inside a nice, neat CSV file.

  1. First, we'll create a dataclass to represent our search results. Then, we need a pipeline to a CSV.
  2. This pipeline should also filter out duplicate results so we're not wasting our precious resources looking things up twice when we scrape the reviews.

Here is our dataclass. We'll call it SearchData.

@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is our DataPipeline. This class opens a pipe to a CSV file and filters out duplicates using their name attribute.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

When we put everything together, we open a new DataPipeline and pass it into start_scrape(). It then gets passed into scrape_search_results(). Instead of finding and printing our data as a dict object, we create a SearchData object and pass it into our DataPipeline.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)

success = True
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, retries=3):
for locality in localities:
scrape_search_results(keyword, location, locality, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • SearchData is used to represent individual search results from our crawl.
  • DataPipeline is used to pipe all of our SearchData objects to a CSV file and remove the duplicates.

Step 3: Adding Concurrency

Adding concurrency is really easy if you know what you're doing. start_scrape() already allows us to crawl a list of different localities.

To crawl this list concurrently, we just need to refactor start_scrape() and replace the for loop with something a little more powerful. We'll do this using ThreadPoolExecutor. This opens up a new pool of threads and runs our parsing function on each thread concurrently.

Here is our old version of start_scrape().

def start_scrape(keyword, location, localities,  data_pipeline=None, retries=3):
for locality in localities:
scrape_search_results(keyword, location, locality, data_pipeline=data_pipeline, retries=retries)

You can see the new and improved version in the snippet below.

def start_scrape(keyword, location, localities,  data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)

exectutor.map() is the portion that actually replaces the for loop. Take a look at the args:

  • scrape_search_results: the function we want to call.
  • [keyword] * len(localities): our keyword passed in as a list.
  • [location] * len(localities): our location passed in as a list.
  • localities: the list of localities we'd like to crawl.
  • [data_pipeline] * len(localities): our DataPipeline object passed in as a list.
  • [retries] * len(localities): our retry limit passed in as a list.

As you probably noticed, all the arguments to our parsing function get passed in as lists. executor.map() takes these lists and passes them into a bunch of separate instances of our parsing function.


Step 4: Bypassing Anti-Bots

Anti-bots can be the achilles heel of any web scraping project. With Google Maps and Reviews, not only do we need to bypass anti-bots, but we also need to wait for our content to render.

We need to tell ScrapeOps Proxy Aggregator the following four things when making our requests:

  • "api_key": your ScrapeOps API key.
  • "url": the url we want to scrape.
  • "country": the country we want our request to be routed through. This parameter uses a location of our choice when we make the request.
  • "wait": how long to wait before sending our response. This allows the content to render on their end before we get it back.

If you look at the function below, you'll see a function that incorporates all of the information above and returns it as a ScrapeOps Proxied url.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

The code below holds our production ready Maps crawler. After creating our proxy function, we simply use it during the parse.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)

success = True
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

We now have reliable proxy support and we're ready to scrape at scale.


Step 5: Production Run

Time to run our crawler in production! If you need to view it in closer detail, here is our main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

To change your results, you can change any of the following:

  • MAX_RETRIES: the max amount of retries for a parse.
  • MAX_THREADS: how many threads you'd like to use when parsing pages simultaneously.
  • LOCATION: the location you'd like to appear from.
  • LOCALITIES: the areas of the map you'd like to scrape. They need to be added in as latitude and longitude pairs.
  • keyword_list: the keywords you'd like to search the map for.

Here are the results from our crawl. We crawled different localities in 12.88 seconds. 12.88 seconds / 3 webpages = 4.293 pages per second.

Crawler Results Terminal


Build A Google Reviews Scraper

Now that we're scraping businesses and generating a list with their urls, we need to read that list and do something with it. We don't just want to read it manually. We want a scraper that reads the list and then scrapes reviews for each business in the list using its url.

Time to add more features. We'll add the following features in order.

  1. Parsing business reviews.
  2. Read the CSV file.
  3. Store the review data.
  4. Concurrently run steps 1 and 3 until the entire list of businesses has been processed.
  5. Proxy Integration will once again be used to bypass anti-bots and render the content we'd like to scrape.

If you followed along and built the crawler, the following sections will seem pretty familiar.


Step 1: Create Simple Business Data Parser

Just as before, we'll start with a basic parsing function that includes error handling and retry logic. Pay close attention to how we extract the data here.

def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[role='main']")

info_cards = soup.find_all("div", class_="MyEned")
for card in info_cards:
review = card.text

full_card = card.parent.parent.parent.parent
reviewer_button = full_card.find("button")
name = reviewer_button.get("aria-label").replace("Photo of ", "")
rating_tag = full_card.select_one("span[role='img']")
stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", ""))
review_date = rating_tag.parent.find_all("span")[-1].text

review_data = {
"name": name,
"stars": stars,
"time_left": review_date,
"review_shortened": review
}

print(review_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
  • We find all the info_card items: info_cards = soup.find_all("div", class_="MyEned").
  • We then iterate through them.
  • We pull the visible review: review = card.text.
  • Use the parent attribute to find the full review card that includes the reviewer name and rating: full_card = card.parent.parent.parent.parent.
  • reviewer_button = full_card.find("button") finds the button that holds information about our reviewer.
  • We find the user's name with the aria-label attribute: name = reviewer_button.get("aria-label").replace("Photo of ", ""). We also remove "Photo of " from the string that includes their name, this way, the only information we're saving is the reviewer name.
  • We follow a similar method to the one above when extracting our rating: int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", "")).
  • review_date = rating_tag.parent.find_all("span")[-1].text finds all the span tags descended from the parent of our rating_tag. The last element is our review date, so we pull index -1 from the array.

Step 2: Loading URLs To Scrape

Next, we need to read the urls that we scraped during the crawl. We'll create another function similar to start_scrape(). This one needs to read our CSV file into an array of dict objects.

Then, it should iterate through the array and call our parsing function on each row we read from the file.

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)

When we put it all together, it looks like this.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)

success = True
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)


def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[role='main']")

info_cards = soup.find_all("div", class_="MyEned")
for card in info_cards:
review = card.text

full_card = card.parent.parent.parent.parent
reviewer_button = full_card.find("button")
name = reviewer_button.get("aria-label").replace("Photo of ", "")
rating_tag = full_card.select_one("span[role='img']")
stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", ""))
review_date = rating_tag.parent.find_all("span")[-1].text

review_data = {
"name": name,
"stars": stars,
"time_left": review_date,
"review_shortened": review
}

print(review_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

Step 3: Storing the Scraped Data

We already have a DataPipeline class. This makes our new storage really easy to implement. We just need to pass a dataclass into a DataPipeline. This new class will be used to represent reviews from the page.

Take a look at ReviewData, it's almost identical to SearchData.

@dataclass
class ReviewData:
name: str = ""
stars: int = 0
time_left: str = ""
review_shortened: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In the full code below, we open a new DataPipeline from inside our parsing function. Then, as we extract our data, we convert it into ReviewData. That ReviewData then gets passed into the DataPipeline as we parse it.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
stars: int = 0
time_left: str = ""
review_shortened: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)

success = True
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)


def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[role='main']")

info_cards = soup.find_all("div", class_="MyEned")
review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
for card in info_cards:
review = card.text

full_card = card.parent.parent.parent.parent
reviewer_button = full_card.find("button")
name = reviewer_button.get("aria-label").replace("Photo of ", "")
rating_tag = full_card.select_one("span[role='img']")
stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", ""))
review_date = rating_tag.parent.find_all("span")[-1].text

review_data = ReviewData(
name=name,
stars=stars,
time_left=review_date,
review_shortened=review
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

Step 4: Adding Concurrency

For concurrency, we're going to use ThreadPoolExecutor just like we did before. We'll replace the for loop in process_results() with some more powerful, multithreaded code.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
  • process_business is the function we want to call on all threads.
  • All other arguments get passed in as lists, just like before.

Step 5: Bypassing Anti-Bots

We've already got our polished proxy function. All we need to do is use it in the right place. One line of our parsing function changes and everything is ready to go.

response = requests.get(get_scrapeops_url(url, location=location))

Here is our final code containing both the crawler and the scraper.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
stars: int = 0
time_left: str = ""
review_shortened: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)

success = True
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)


def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[role='main']")

info_cards = soup.find_all("div", class_="MyEned")
review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
for card in info_cards:
review = card.text

full_card = card.parent.parent.parent.parent
reviewer_button = full_card.find("button")
name = reviewer_button.get("aria-label").replace("Photo of ", "")
rating_tag = full_card.select_one("span[role='img']")
stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", ""))
review_date = rating_tag.parent.find_all("span")[-1].text

review_data = ReviewData(
name=name,
stars=stars,
time_left=review_date,
review_shortened=review
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Now, to test the entire thing in production. You can view our main again below.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

If you remember from earlier, our crawl took 12.88 seconds.

The entire run for the crawl and scrape took 91.158 seconds and generated a crawl report with 22 results. 91.158 - 12.88 = 78.278 seconds spent scraping reviews. 78.278 seconds / 22 businesses = 3.558 seconds per page.

This is right on par with our crawler speed from earlier.

Scraper Results Terminal


Any time you scrape the web, you need to pay attention to what you're doing. If you're scraping public data (data not gated behind a login) like we did in this article, it is typically legal no matter what country you live in.

However, private data is a completely different story. If you decide to scrape private data, make sure to understand the laws and regulations that govern that data because you're subject to them.

While our scrape was legal. It does potentially violate the Google Maps Terms of Service and the robots.txt. Violating these can lead to suspension and even deletion of your account. You can view these documents from Google Maps below.


Conclusion

In conclusion, scraping Google Reviews is both a tricky and difficult task. It requires us to crawl Google Maps to obtain a list of businesses and then we need to build a scraper for the reviews on each business.

On top of all that, our content is rendered dynamically so we need to use the ScrapeOps Headless Browser to render all of our content. You should have a solid grasp on Python Requests and BeautifulSoup. You should also understand parsing, data storage, concurrency and proxy integration.

If you're interested in the tech we used when building this project and writing this article, look at the links below.


More Python Web Scraping Guides

Here at ScrapeOps, we wrote the playbook on scraping with Python. No matter what your skill level is, we've got something for you.

To learn more from our "How To Scrape" series, check out the links below!