How to Scrape Google Reviews With Requests and BeautifulSoup
Scraping Google Reviews is notoriously difficult. To start, Google Reviews are only accessible via Google Maps. On top of that, Google uses dynamic CSS selectors, the data gets loaded dynamically, and all of it is incredibly nested.
Today, we'll learn how to crawl Google Maps and then we'll learn how to get the reviews for each business found in our crawl. This information is incredibly useful, especially when you wish to collect aggregate data on different businesses.
- TLDR: How to Scrape Google Reviews
- How To Architect Our Scraper
- Understanding How To Scrape Google Reviews
- Setting Up Our Google Reviews Scraper
- Build A Google Reviews Search Crawler
- Build A Gogle Reviews Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Google Reviews
To scrape Google Reviews, we need to crawl Google Maps and create a list of businesses. Then, we need to lookup each business from the list and save the reviews. The code below does exactly this. To get started:
- Create a new project folder with a
config.json
file. - Then add your ScrapeOps API key to the file,
{"api_key": "your-super-secret-api-key"}
. - Copy/paste the code below into a new Python file and you're good to go!
- Run it with
python name_of_your_python_file.py
.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
stars: int = 0
time_left: str = ""
review_shortened: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)
success = True
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[role='main']")
info_cards = soup.find_all("div", class_="MyEned")
review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
for card in info_cards:
review = card.text
full_card = card.parent.parent.parent.parent
reviewer_button = full_card.find("button")
name = reviewer_button.get("aria-label").replace("Photo of ", "")
rating_tag = full_card.select_one("span[role='img']")
stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", ""))
review_date = rating_tag.parent.find_all("span")[-1].text
review_data = ReviewData(
name=name,
stars=stars,
time_left=review_date,
review_shortened=review
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
To customize your results, change any of the following:
MAX_RETRIES
: the max amount of retries for a parse.MAX_THREADS
: how many threads you'd like to use when parsing pages simultaneously.LOCATION
: the location you'd like to appear from.LOCALITIES
: the areas of the map you'd like to scrape. They need to be added in as latitude and longitude pairs.keyword_list
: the keywords you'd like to search the map for.
When you change your localities, you need to use latitude and longitude pairs.
How To Architect Our Google Reviews Scraper
As mentioned above, to get Google Reviews, we need to crawl Google Maps. Maps itself is really tricky to scrape. We need a locality (latitude and longitude), and we need to wait for dynamic content to load on the screen before getting our result.
If our proxy server properly loads our page, we can extract the following information for each business in the search: name
, stars
, url
, and rating_count
. Then, we'll save these to a CSV file. Then, our review scraper will go through and find reviews for each of these businesses.
Our Maps crawler will need to do the following:
- Lookup businesses in a certain locality and parse the results.
- Store the parsed data inside a CSV file.
- Concurrently parse multiple localities at once.
- Integrate with a proxy to get past Google's anti-bot systems
After our crawl, the Reviews scraper needs to perform these tasks:
- Read the CSV from the crawl into an array.
- Parse reviews from each business extracted during the crawl.
- Store review data for each business.
- Concurrently parse and store this data.
- Use proxy integration to get past anything that might block us.
Understanding How To Scrape Google Reviews
When we scrape Google Reviews, our data gets loaded dynamically. On top of that, it is incredibly nested within the page. Let's get a better understanding of how exactly to get the pages that contain our data. Then, we'll take a look at where we need to pull the data from.
Step 1: How To Request Google Reviews Pages
As with any scraping job, we need to begin with a GET request. If you're unfamiliar with HTTP, we perform a GET to get information.
- When you navigate to a site in your browser, your browser performs what's called a GET request to the server.
- Your browser receives a response back in the form of an HTML page.
- With Python Requests (our HTTP client), we'll perform that same GET request.
- The big difference is how we handle the HTML response.
- Instead of rendering the page for us to view (like the browser does), we'll code our scraper to actually dig through the HTML for the information.
If you look below, you can veiw an example search for the word "restaurant". Here is our URL:
https://www.google.com/maps/place/Leo's+Coney+Island/@42.3937072,-83.4828338,17z/data=!4m6!3m5!1s0x8824acedc1b6f397:0xaa85d06de541a352!8m2!3d42.3937072!4d-83.4828338!16s%2Fg%2F1tf299fd?authuser=0&hl=en&entry=ttu&g_ep=EgoyMDI0MDkwOC4wIKXMDSoASAFQAw%3D%3D`
@42.3937072,-83.4828338
is our latitude and longitude.
When we lookup a specific restaurant, we get a super similar page. We get our map, and along with it, we get a section of the page containing the business information and reviews.
Step 2: How To Extract Data From Google Reviews Results and Pages
As you just learned, we start with a GET request. The next question, what do we do with the page once we've gotten it? We need to dig through the HTML and pull the data out of the HTML page. Let's take a look at the pages we just visited and see where the data is located inside the HTML.
On the search page, each restaurant has an a
tag with a link to the restaurant information.
On the individual business page, the actual reviews are embedded within a div
with a class of MyEned
. Once we find this element, we can find its parent
elements. Once we've found the correct parent
element, we can find all of the other information we need.
Step 3: Geolocated Data
There are two things we need to do in order to handle geolocation.
-
To start, when we search businesses on Google Maps, we're searching based on a specific locality using its latitude and longitude. When you're interacting with Google Maps, you're not paying attention to that part, but these coordinates are actually saved in our URL. Think back to the latitude and longitude in the url from earlier,
@42.3937072,-83.4828338
. -
On top of the locality we wish to search, we need to handle the actual location we want to appear in on Google's servers. To take care of this, we can use the
country
param with the ScrapeOps Proxy Aggregator.
- If you want to appear in the US, you can pass
{"country": "us"}
to ScrapeOps.
You can view a full list of supported countries here.
Setting Up Our Google Reviews Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir google-reviews-scraper
cd google-reviews-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A Google Reviews Search Crawler
To get started, we need to get a list of businesses and their urls. To accomplish this, we need to build a crawler that performs a search and saves a list of businesses. We're going to go through several iterations and build our crawler in the following steps:
- Perform a search and parse the results.
- Store those results safely in a CSV file.
- Run steps 1 and 2 on multiple localities with concurrency.
- Use proxy integration to help control our geolocation and bypass anti-bots.
Step 1: Create Simple Search Data Parser
We need to start by creating a simple search parser. In the code example below, we setup our basic structure. This code contains error handling, retry logic, and our parsing function, scrape_search_results()
.
Pay close attention to the parsing logic going on in this script.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, locality, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = {
"name": name,
"stars": rating,
"url": maps_link,
"rating_count": rating_count
}
print(search_data)
success = True
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, retries=3):
for locality in localities:
scrape_search_results(keyword, location, locality, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, LOCATION, LOCALITIES, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
- First, we find all of the business links,
business_links = soup.select("div div a")
. - We filter out all of our unwanted links.
- We retrieve the name of each business with
business_link.get("aria-label")
. business_link.get("href")
gives us the link to each business.- We then find the
parent
element of the business link,full_card = business_link.parent
. full_card.select_one("span[role='img']")
finds our rating holder.- We use basic string splitting to extract the rating and then we convert it to an integer.
Step 2: Storing the Scraped Data
Now, to store our data. Without data storage, our crawl would be pretty useless. Our goal is to store all the data extracted from the crawl inside a nice, neat CSV file.
- First, we'll create a
dataclass
to represent our search results. Then, we need a pipeline to a CSV. - This pipeline should also filter out duplicate results so we're not wasting our precious resources looking things up twice when we scrape the reviews.
Here is our dataclass
. We'll call it SearchData
.
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is our DataPipeline
. This class opens a pipe to a CSV file and filters out duplicates using their name
attribute.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
When we put everything together, we open a new DataPipeline
and pass it into start_scrape()
. It then gets passed into scrape_search_results()
. Instead of finding and printing our data as a dict
object, we create a SearchData
object and pass it into our DataPipeline
.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)
success = True
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, retries=3):
for locality in localities:
scrape_search_results(keyword, location, locality, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
SearchData
is used to represent individual search results from our crawl.DataPipeline
is used to pipe all of ourSearchData
objects to a CSV file and remove the duplicates.
Step 3: Adding Concurrency
Adding concurrency is really easy if you know what you're doing. start_scrape()
already allows us to crawl a list of different localities.
To crawl this list concurrently, we just need to refactor start_scrape()
and replace the for
loop with something a little more powerful. We'll do this using ThreadPoolExecutor
. This opens up a new pool of threads and runs our parsing function on each thread concurrently.
Here is our old version of start_scrape()
.
def start_scrape(keyword, location, localities, data_pipeline=None, retries=3):
for locality in localities:
scrape_search_results(keyword, location, locality, data_pipeline=data_pipeline, retries=retries)
You can see the new and improved version in the snippet below.
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)
exectutor.map()
is the portion that actually replaces the for
loop. Take a look at the args:
scrape_search_results
: the function we want to call.[keyword] * len(localities)
: our keyword passed in as a list.[location] * len(localities)
: our location passed in as a list.localities
: the list of localities we'd like to crawl.[data_pipeline] * len(localities)
: ourDataPipeline
object passed in as a list.[retries] * len(localities)
: our retry limit passed in as a list.
As you probably noticed, all the arguments to our parsing function get passed in as lists. executor.map()
takes these lists and passes them into a bunch of separate instances of our parsing function.
Step 4: Bypassing Anti-Bots
Anti-bots can be the achilles heel of any web scraping project. With Google Maps and Reviews, not only do we need to bypass anti-bots, but we also need to wait
for our content to render.
We need to tell ScrapeOps Proxy Aggregator the following four things when making our requests:
"api_key"
: your ScrapeOps API key."url"
: the url we want to scrape."country"
: the country we want our request to be routed through. This parameter uses a location of our choice when we make the request."wait"
: how long to wait before sending our response. This allows the content to render on their end before we get it back.
If you look at the function below, you'll see a function that incorporates all of the information above and returns it as a ScrapeOps Proxied url.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
The code below holds our production ready Maps crawler. After creating our proxy function, we simply use it during the parse.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)
success = True
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
We now have reliable proxy support and we're ready to scrape at scale.
Step 5: Production Run
Time to run our crawler in production! If you need to view it in closer detail, here is our main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
To change your results, you can change any of the following:
MAX_RETRIES
: the max amount of retries for a parse.MAX_THREADS
: how many threads you'd like to use when parsing pages simultaneously.LOCATION
: the location you'd like to appear from.LOCALITIES
: the areas of the map you'd like to scrape. They need to be added in as latitude and longitude pairs.keyword_list
: the keywords you'd like to search the map for.
Here are the results from our crawl. We crawled different localities in 12.88 seconds. 12.88 seconds / 3 webpages = 4.293 pages per second.
Build A Google Reviews Scraper
Now that we're scraping businesses and generating a list with their urls, we need to read that list and do something with it. We don't just want to read it manually. We want a scraper that reads the list and then scrapes reviews for each business in the list using its url.
Time to add more features. We'll add the following features in order.
- Parsing business reviews.
- Read the CSV file.
- Store the review data.
- Concurrently run steps 1 and 3 until the entire list of businesses has been processed.
- Proxy Integration will once again be used to bypass anti-bots and render the content we'd like to scrape.
If you followed along and built the crawler, the following sections will seem pretty familiar.
Step 1: Create Simple Business Data Parser
Just as before, we'll start with a basic parsing function that includes error handling and retry logic. Pay close attention to how we extract the data here.
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[role='main']")
info_cards = soup.find_all("div", class_="MyEned")
for card in info_cards:
review = card.text
full_card = card.parent.parent.parent.parent
reviewer_button = full_card.find("button")
name = reviewer_button.get("aria-label").replace("Photo of ", "")
rating_tag = full_card.select_one("span[role='img']")
stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", ""))
review_date = rating_tag.parent.find_all("span")[-1].text
review_data = {
"name": name,
"stars": stars,
"time_left": review_date,
"review_shortened": review
}
print(review_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
- We find all the
info_card
items:info_cards = soup.find_all("div", class_="MyEned")
. - We then iterate through them.
- We pull the visible review:
review = card.text
. - Use the
parent
attribute to find the full review card that includes the reviewer name and rating:full_card = card.parent.parent.parent.parent
. reviewer_button = full_card.find("button")
finds the button that holds information about our reviewer.- We find the user's name with the
aria-label
attribute:name = reviewer_button.get("aria-label").replace("Photo of ", "")
. We also remove"Photo of "
from the string that includes their name, this way, the only information we're saving is the reviewer name. - We follow a similar method to the one above when extracting our rating:
int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", ""))
. review_date = rating_tag.parent.find_all("span")[-1].text
finds all thespan
tags descended from theparent
of ourrating_tag
. The last element is our review date, so we pull index-1
from the array.
Step 2: Loading URLs To Scrape
Next, we need to read the urls that we scraped during the crawl. We'll create another function similar to start_scrape()
. This one needs to read our CSV file into an array of dict
objects.
Then, it should iterate through the array and call our parsing function on each row we read from the file.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
When we put it all together, it looks like this.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)
success = True
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[role='main']")
info_cards = soup.find_all("div", class_="MyEned")
for card in info_cards:
review = card.text
full_card = card.parent.parent.parent.parent
reviewer_button = full_card.find("button")
name = reviewer_button.get("aria-label").replace("Photo of ", "")
rating_tag = full_card.select_one("span[role='img']")
stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", ""))
review_date = rating_tag.parent.find_all("span")[-1].text
review_data = {
"name": name,
"stars": stars,
"time_left": review_date,
"review_shortened": review
}
print(review_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
We already have a DataPipeline
class. This makes our new storage really easy to implement. We just need to pass a dataclass
into a DataPipeline
. This new class will be used to represent reviews from the page.
Take a look at ReviewData
, it's almost identical to SearchData
.
@dataclass
class ReviewData:
name: str = ""
stars: int = 0
time_left: str = ""
review_shortened: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In the full code below, we open a new DataPipeline
from inside our parsing function. Then, as we extract our data, we convert it into ReviewData
. That ReviewData
then gets passed into the DataPipeline
as we parse it.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
stars: int = 0
time_left: str = ""
review_shortened: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)
success = True
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[role='main']")
info_cards = soup.find_all("div", class_="MyEned")
review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
for card in info_cards:
review = card.text
full_card = card.parent.parent.parent.parent
reviewer_button = full_card.find("button")
name = reviewer_button.get("aria-label").replace("Photo of ", "")
rating_tag = full_card.select_one("span[role='img']")
stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", ""))
review_date = rating_tag.parent.find_all("span")[-1].text
review_data = ReviewData(
name=name,
stars=stars,
time_left=review_date,
review_shortened=review
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 4: Adding Concurrency
For concurrency, we're going to use ThreadPoolExecutor
just like we did before. We'll replace the for
loop in process_results()
with some more powerful, multithreaded code.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
process_business
is the function we want to call on all threads.- All other arguments get passed in as lists, just like before.
Step 5: Bypassing Anti-Bots
We've already got our polished proxy function. All we need to do is use it in the right place. One line of our parsing function changes and everything is ready to go.
response = requests.get(get_scrapeops_url(url, location=location))
Here is our final code containing both the crawler and the scraper.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
stars: int = 0
time_left: str = ""
review_shortened: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)
success = True
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[role='main']")
info_cards = soup.find_all("div", class_="MyEned")
review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
for card in info_cards:
review = card.text
full_card = card.parent.parent.parent.parent
reviewer_button = full_card.find("button")
name = reviewer_button.get("aria-label").replace("Photo of ", "")
rating_tag = full_card.select_one("span[role='img']")
stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", ""))
review_date = rating_tag.parent.find_all("span")[-1].text
review_data = ReviewData(
name=name,
stars=stars,
time_left=review_date,
review_shortened=review
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Now, to test the entire thing in production. You can view our main
again below.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
If you remember from earlier, our crawl took 12.88 seconds.
The entire run for the crawl and scrape took 91.158 seconds and generated a crawl report with 22 results. 91.158 - 12.88 = 78.278 seconds spent scraping reviews. 78.278 seconds / 22 businesses = 3.558 seconds per page.
This is right on par with our crawler speed from earlier.
Legal and Ethical Considerations
Any time you scrape the web, you need to pay attention to what you're doing. If you're scraping public data (data not gated behind a login) like we did in this article, it is typically legal no matter what country you live in.
However, private data is a completely different story. If you decide to scrape private data, make sure to understand the laws and regulations that govern that data because you're subject to them.
While our scrape was legal. It does potentially violate the Google Maps Terms of Service and the robots.txt
. Violating these can lead to suspension and even deletion of your account. You can view these documents from Google Maps below.
Conclusion
In conclusion, scraping Google Reviews is both a tricky and difficult task. It requires us to crawl Google Maps to obtain a list of businesses and then we need to build a scraper for the reviews on each business.
On top of all that, our content is rendered dynamically so we need to use the ScrapeOps Headless Browser to render all of our content. You should have a solid grasp on Python Requests and BeautifulSoup. You should also understand parsing, data storage, concurrency and proxy integration.
If you're interested in the tech we used when building this project and writing this article, look at the links below.
More Python Web Scraping Guides
Here at ScrapeOps, we wrote the playbook on scraping with Python. No matter what your skill level is, we've got something for you.
To learn more from our "How To Scrape" series, check out the links below!