Skip to main content

Scrape Google Maps With Python Requests and BeautifulSoup

How to Scrape Google Maps With Requests and BeautifulSoup

Google Maps is pretty much the central hub if you're looking for restaurants or other businesses in any location. When we scrape Google Maps, we have access to a wealth of data. We can get the location, rating, and hours of a business... depending on how much you're willing to dig, you can get even more information than this!

In this article, we'll learn how to build a scraper to collect data from Google Maps.


TLDR - How to Scrape Google Maps

If you're looking for a Google Maps scraper, look no further. The code below does exactly that.

  1. Just create a new project folder and install your dependencies.
  2. Then create a config.json file with your ScrapeOps API keys.
  3. Copy and paste this code into a Python file and run it!

To run your script, use the following command.

python name_of_your_script.py

Here is the code for you to copy and paste.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class BusinessData:
name: str = ""
street_address: str = ""
city: str = ""
state_and_zip: str = ""
sunday: str = ""
monday: str = ""
tuesday: str = ""
wednesday: str = ""
thursday: str = ""
friday: str = ""
saturday: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)


def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
business_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

info_cards = soup.find_all("div")

for card in info_cards:
aria_label = card.get("aria-label")
if not aria_label:
continue
if "Information" not in aria_label:
continue
print("card exists")

button = card.find("button")
address = button.text.replace("", "")
address_array = address.split(",")
street_address = address_array[0]
city = address_array[1]
state_and_zip = address_array[2]

sunday = ""
monday = ""
tuesday = ""
wednesday = ""
thursday = ""
friday = ""
saturday = ""

hours_cards = card.find_all("tr")
for card in hours_cards:
row_text = card.text
if "Sunday" in row_text:
sunday = row_text.replace("Sunday", "")
elif "Monday" in row_text:
monday = row_text.replace("Monday", "")
elif "Tuesday" in row_text:
tuesday = row_text.replace("Tuesday", "")
elif "Wednesday" in row_text:
wednesday = row_text.replace("Wednesday", "")
elif "Thursday" in row_text:
thursday = row_text.replace("Thursday", "")
elif "Friday" in row_text:
friday = row_text.replace("Friday", "")
elif "Saturday" in row_text:
saturday = row_text.replace("Saturday", "")
else:
continue

business_data = BusinessData(
name=row["name"],
street_address=street_address,
city=city,
state_and_zip=state_and_zip,
sunday=sunday,
monday=monday,
tuesday=tuesday,
wednesday=wednesday,
thursday=thursday,
friday=friday,
saturday=saturday
)
business_pipeline.add_data(business_data)
break

business_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

To change your results, feel free to change any of the following constants:

  • MAX_RETRIES: the max amount of retries for a parse.
  • MAX_THREADS: how many threads you'd like to use when parsing pages simultaneously.
  • LOCATION: the location you'd like to appear from.
  • LOCALITIES: the areas of the map you'd like to scrape. They need to be added in as latitude and longitude pairs.
  • keyword_list: the keywords you'd like to search the map for, in this instance restaurant.

How To Architect Our Google Maps Scraper

Our project is going to consist of two different scrapers.

1.Our first one will be a search crawler. The crawler has a pretty simple purpose: perform a search and extract the results into a CSV file.
2. Our scraper will then read the CSV and scrape the individual businesses inside it.

Here is the process for our crawler:

  1. Perform a search and parse the results.
  2. Store the results in a CSV file.
  3. Concurrently run steps one and 2 on separate localities.
  4. Integrate with a proxy to bypass anti-bots and anything else that might block us.

The scraper process is here:

  1. Read the businesses from the CSV.
  2. Iterate through the businesses and parse the results.
  3. Store the parsed data for each individual business.
  4. Run steps 2 and 3 concurrently for speed and efficiency.
  5. Again, integrate with a proxy to avoid getting blocked.

Understanding How To Scrape Google Maps

Before we build our crawler and scraper, we need to get a better understanding of this data from a high level.

In the sections below, we're going to go through Google Maps search pages and the individual business pages from Google maps. This way, we know where our data is located and how we'd like to extract it.


Step 1: How To Request Google Maps Pages

Here is the URL for a Google Maps search for restaurants:

https://www.google.com/maps/search/Restaurants/@42.3753166,-83.4750232,15z/data=!3m1!4b1?entry=ttu
  • 42.3753166,-83.4750232 represents the area we're searching.
  • 42.3753166 is our latitude and -83.4750232 is our longitude.
  • So in order to search a specific area, you need to get the latitude and longitude of that area. /Restaurants tells Google that we'd like to search for restaurants.

Take a look at the image below to see the URL.

Google Maps Page


Step 2: How To Extract Data From Google Maps Results and Pages

To extract data from our maps search, we're going to pull the link elments out of the page. Since all the data is generated dynamically, we'll also need to use the wait parameter with the ScrapeOps Proxy API.

If you look at the image below, you can see a highlighted a element. Within that element, the business name is embedded as the aria-label.

Google Maps HTML Inspection

To extract data from individual business pages, we follow a pretty similar strategy.

  • All of our relevant information is located inside one specific div card.
  • All business card have an aria-label that begins with the word "Information" followed by the name of the business.
  • To get these, we'll first find all the divs and then filter through them until we find one with an aria-label containing the word "Information".

You can see this card highlighted below.

Google Maps Business HTML Inspection


Step 3: Geolocated Data

To handle our geolocation, we'll be using two things.

  1. We'll pass a country param into the ScrapeOps API. This will route us through servers in a country of our choosing.
  2. We'll also create a locality parameter when building our url. This locality will be made from the latitude and longitude points you saw earlier.

Setting Up Our Google Maps Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir google-maps-scraper

cd google-maps-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A Google Maps Search Crawler

Now that we know what we want to do, it's time to express it using code. We'll start by building a parser. Then, we'll add data storage, concurrency, and proxy integration.

If you follow the steps below, you'll be able to build a Google Maps crawler from scratch.


Step 1: Create Simple Search Data Parser

We're going to start with a basic parser. We'll add our imports, parsing functionality, retry logic and error handling.

While the basic structure is important, what you should really pay attention to here is the parsing logic.

In the code below, we find all a elements descended from at least two div elements. Then, we use some logic to filter out the links we don't want.

After we've found a target link, we find its parent element and pull the rest of our data from the parent element.

Here is the code we'll start with.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, locality, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = {
"name": name,
"stars": rating,
"url": maps_link,
"rating_count": rating_count
}


logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, retries=3):
for locality in localities:
scrape_search_results(keyword, location, locality, retries=retries)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

start_scrape(keyword, LOCATION, LOCALITIES, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
  • Our url is constructed like so: https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu.
    • formatted_keyword is the search we're performing.
    • locality holds the latitude and longitude coordinates for the area we'd like to scrape.
  • We then request the url and if the operation hasn't succeeded:
    • soup.select("div div a") finds all of our link elements.
    • We use a simple if and continue statement to filter out unwanted links.
    • business_link.get("aria-label") pulls the name of the business.
    • business_link.get("href") finds the link to the site.
    • We set our defaults of rating and rating_count to zero.
    • If ratings are present, we pull the rating and rating_count with some string splitting and save them to their respective variables.

Step 2: Storing the Scraped Data

Now that we're extracting the proper data, we need to save that data to a CSV file. This way, people can read the data and so can other programs. By storing this data, we can review the file later and our scraper will be able to read the file later on when we build it.

We're going to start by building a dataclass, SearchData. This class will simply hold data that needs to be stored.

@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

We also need something to take in a dataclass and store it. This is where our DataPipeline comes into play. This class opens a pipe to a CSV file and filters out duplicates.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

With these classes added in, our full code now looks like this.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())



class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, retries=3):
for locality in localities:
scrape_search_results(keyword, location, locality, retries=retries)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • Before starting our scrape, we open a DataPipeline, crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv").
  • We then pass the pipeline into start_scrape() which in turn passes it into scrape_search_results().
  • Instead of printing our extracted data, we use it to make a SearchData object and pass it into the pipeline.
  • Once the crawl has finished, we close the pipeline with crawl_pipeline.close_pipeline().

Step 3: Adding Concurrency

start_scrape() already gives us the ability to scrape a list of localities. Instead of scraping them one at a time with a for loop, we should be scraping as many as possible at one time. This is where concurrency comes into play. ü

We're going to refactor start_scrape() to use multithreading instead of a for loop.

def start_scrape(keyword, location, localities,  data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)

Pay close attention to the arguments we pass into executor.map():

  • scrape_search_results is what we want to do on all the available threads.
  • All subsequent arguments are going to get passed into scrape_search_results.
  • localities is the list of areas we'd like to search.
  • All other arguments get passed in as arrays the length of localities.

Here is our fully updated code.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())



class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Our crawler now has the ability to scrape multiple localities at the same time.


Step 4: Bypassing Anti-Bots

Before we do our production run, we need to add proxy integration. This will allow us to bypass anti-bots and anything else that might block our scraper.

Take a look at the function below. While it's similar to other proxy functions you'll find in other ScrapeOps guides, there are a few key differences.

We add some additional arguments like wait and residential.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
  • "api_key": holds your ScrapeOps API key.
  • "url": is the url we want to scrape.
  • "country": is the location we'd like to be routed through.
  • "wait": 5000: tells the ScrapeOps server that we want it to wait 5 seconds for content to render before sending our response back.
  • "residential": True tells ScrapeOps that we want to use a residential IP address. This greatly decreases our likelihood of getting blocked.

Here is our finalized code for the crawler.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())



class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 5: Production Run

Our crawler is finished and it's time to test it out in production. We'll be scraping three different localities. If you want to tweak your results, go ahead and change any of the constants from the main below.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Here are the final results.

Crawler Performance Terminal

It took us 11.779 seconds to crawl 3 separate localities and generate a file with 24 search results... even with the 5 second wait on each page. This comes out to roughly 3.93 seconds per locality.


Build A Google Maps Scraper

Now that we have a functional crawler, we need a functional scraper. Our scraper needs to read the CSV from earlier. Then it needs to parse each individual business and store its data.

Step 1: Create Simple Business Data Parser

Like before, we're going to start with a basic data parser. First, we find all the div objects. The we iterate through them until we've found the div that holds the business info.

Once we've found that card, we pull the address and business hours from it.

def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
info_cards = soup.find_all("div")

for card in info_cards:
aria_label = card.get("aria-label")
if not aria_label:
continue
if "Information" not in aria_label:
continue
print("card exists")

button = card.find("button")
address = button.text.replace("", "")
address_array = address.split(",")
street_address = address_array[0]
city = address_array[1]
state_and_zip = address_array[2]

sunday = ""
monday = ""
tuesday = ""
wednesday = ""
thursday = ""
friday = ""
saturday = ""

hours_cards = card.find_all("tr")
for card in hours_cards:
row_text = card.text
if "Sunday" in row_text:
sunday = row_text.replace("Sunday", "")
elif "Monday" in row_text:
monday = row_text.replace("Monday", "")
elif "Tuesday" in row_text:
tuesday = row_text.replace("Tuesday", "")
elif "Wednesday" in row_text:
wednesday = row_text.replace("Wednesday", "")
elif "Thursday" in row_text:
thursday = row_text.replace("Thursday", "")
elif "Friday" in row_text:
friday = row_text.replace("Friday", "")
elif "Saturday" in row_text:
saturday = row_text.replace("Saturday", "")
else:
continue

business_data = {
"name": row["name"],
"street_address": street_address,
"city": city,
"state_and_zip": state_and_zip,
"sunday": sunday,
"monday": monday,
"tuesday": tuesday,
"wednesday": wednesday,
"thursday": thursday,
"friday": friday,
"saturday": saturday
}
print(business_data)
break

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
  • if not aria_label is used to filter out any div cards that don't hold the aria-label attribute.
  • if "Information" not in aria_label is used to skip any div cards that have the wrong aria-label.
  • Once we've found our target card, we pull the address info from a button element and then use some string splitting to separate different pieces of the address: street_address, city, state_and_zip.
  • We name a variable after each day of the week and give it an empty string default value.
  • We find all the tr elements and assign the daily hours to their corresponding variables.

Step 2: Loading URLs To Scrape

To use our parser, we need to give it businesses to process. To feed it a list of businesses, we have to read the CSV generated by our crawler. Here, process_results() does exactly that.

It reads the CSV file, and then it runs process_business() on each and every one of those results.

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)

Here is our fully updated code.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())



class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)


def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
info_cards = soup.find_all("div")

for card in info_cards:
aria_label = card.get("aria-label")
if not aria_label:
continue
if "Information" not in aria_label:
continue
print("card exists")

button = card.find("button")
address = button.text.replace("", "")
address_array = address.split(",")
street_address = address_array[0]
city = address_array[1]
state_and_zip = address_array[2]

sunday = ""
monday = ""
tuesday = ""
wednesday = ""
thursday = ""
friday = ""
saturday = ""

hours_cards = card.find_all("tr")
for card in hours_cards:
row_text = card.text
if "Sunday" in row_text:
sunday = row_text.replace("Sunday", "")
elif "Monday" in row_text:
monday = row_text.replace("Monday", "")
elif "Tuesday" in row_text:
tuesday = row_text.replace("Tuesday", "")
elif "Wednesday" in row_text:
wednesday = row_text.replace("Wednesday", "")
elif "Thursday" in row_text:
thursday = row_text.replace("Thursday", "")
elif "Friday" in row_text:
friday = row_text.replace("Friday", "")
elif "Saturday" in row_text:
saturday = row_text.replace("Saturday", "")
else:
continue

business_data = {
"name": row["name"],
"street_address": street_address,
"city": city,
"state_and_zip": state_and_zip,
"sunday": sunday,
"monday": monday,
"tuesday": tuesday,
"wednesday": wednesday,
"thursday": thursday,
"friday": friday,
"saturday": saturday
}
print(business_data)
break

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

Step 3: Storing the Scraped Data

We need to add another dataclass. We'll call this one BusinessData. This class will hold the business's address data and hours. After we instantiate a BusinessData object, we'll need to pass it into a DataPipeline like we did before.

Here is our BusinessData class.

@dataclass
class BusinessData:
name: str = ""
street_address: str = ""
city: str = ""
state_and_zip: str = ""
sunday: str = ""
monday: str = ""
tuesday: str = ""
wednesday: str = ""
thursday: str = ""
friday: str = ""
saturday: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In our updated code below, we create another DataPipeline from inside our parsing function. We then create a BusinessData object to pass into it and close the pipeline once we've stored the data.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class BusinessData:
name: str = ""
street_address: str = ""
city: str = ""
state_and_zip: str = ""
sunday: str = ""
monday: str = ""
tuesday: str = ""
wednesday: str = ""
thursday: str = ""
friday: str = ""
saturday: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)


def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
business_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

info_cards = soup.find_all("div")

for card in info_cards:
aria_label = card.get("aria-label")
if not aria_label:
continue
if "Information" not in aria_label:
continue
print("card exists")

button = card.find("button")
address = button.text.replace("", "")
address_array = address.split(",")
street_address = address_array[0]
city = address_array[1]
state_and_zip = address_array[2]

sunday = ""
monday = ""
tuesday = ""
wednesday = ""
thursday = ""
friday = ""
saturday = ""

hours_cards = card.find_all("tr")
for card in hours_cards:
row_text = card.text
if "Sunday" in row_text:
sunday = row_text.replace("Sunday", "")
elif "Monday" in row_text:
monday = row_text.replace("Monday", "")
elif "Tuesday" in row_text:
tuesday = row_text.replace("Tuesday", "")
elif "Wednesday" in row_text:
wednesday = row_text.replace("Wednesday", "")
elif "Thursday" in row_text:
thursday = row_text.replace("Thursday", "")
elif "Friday" in row_text:
friday = row_text.replace("Friday", "")
elif "Saturday" in row_text:
saturday = row_text.replace("Saturday", "")
else:
continue

business_data = BusinessData(
name=row["name"],
street_address=street_address,
city=city,
state_and_zip=state_and_zip,
sunday=sunday,
monday=monday,
tuesday=tuesday,
wednesday=wednesday,
thursday=thursday,
friday=friday,
saturday=saturday
)
business_pipeline.add_data(business_data)
break

business_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

Step 4: Adding Concurrency

Once again, we'll use ThreadPoolExecutor to replace a for loop. Just like before, our first arg is the function we'd like to call. Each argument after is a list that gets passed into process_business.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
  • process_business is the function we want to call on open threads.
  • All arguments to process_business get passed in as lists.

Step 5: Bypassing Anti-Bots

We already have a function that gives us proxy integration, we just need to use it again. In order to do this, we'll change one final line of the parsing function.

response = requests.get(get_scrapeops_url(url, location=location))

Here is our finalized code containing both the crawler and the scraper.

import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class BusinessData:
name: str = ""
street_address: str = ""
city: str = ""
state_and_zip: str = ""
sunday: str = ""
monday: str = ""
tuesday: str = ""
wednesday: str = ""
thursday: str = ""
friday: str = ""
saturday: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent

rating_holder = full_card.select_one("span[role='img']")

rating = 0.0
rating_count = 0

if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))

search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)


def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
business_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

info_cards = soup.find_all("div")

for card in info_cards:
aria_label = card.get("aria-label")
if not aria_label:
continue
if "Information" not in aria_label:
continue
print("card exists")

button = card.find("button")
address = button.text.replace("", "")
address_array = address.split(",")
street_address = address_array[0]
city = address_array[1]
state_and_zip = address_array[2]

sunday = ""
monday = ""
tuesday = ""
wednesday = ""
thursday = ""
friday = ""
saturday = ""

hours_cards = card.find_all("tr")
for card in hours_cards:
row_text = card.text
if "Sunday" in row_text:
sunday = row_text.replace("Sunday", "")
elif "Monday" in row_text:
monday = row_text.replace("Monday", "")
elif "Tuesday" in row_text:
tuesday = row_text.replace("Tuesday", "")
elif "Wednesday" in row_text:
wednesday = row_text.replace("Wednesday", "")
elif "Thursday" in row_text:
thursday = row_text.replace("Thursday", "")
elif "Friday" in row_text:
friday = row_text.replace("Friday", "")
elif "Saturday" in row_text:
saturday = row_text.replace("Saturday", "")
else:
continue

business_data = BusinessData(
name=row["name"],
street_address=street_address,
city=city,
state_and_zip=state_and_zip,
sunday=sunday,
monday=monday,
tuesday=tuesday,
wednesday=wednesday,
thursday=thursday,
friday=friday,
saturday=saturday
)
business_pipeline.add_data(business_data)
break

business_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Time to test out the whole thing. We'll be using the same settings as we did earlier. You can feel free to change the constants if you want to tweak your results.

Here is our main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

You can see the results from the crawl and scrape below.

alt text

The full crawl and scrape took 86.452 seconds. The crawler generated a report with 23 businesses. If you remember earlier, the crawl took 11.779 seconds. 86.452 - 11.779 = 74.763 seconds spent scraping. 74.763 / 23 = 3.25 seconds per business.


When you scrape Google Maps, you are subject to both the Google Maps terms of service and their robots.txt. Violating these terms could get your Google account suspended or even permanently deleted. Be careful about breaking rules online.

  • The Google Maps terms are available here.
  • Their robots.txt is here.

Always remember that when you're scraping public data, it is generally considered public knowledge so you're usually in the clear legally.

Private data is another matter. When you scrape private data, you are subject to that company's terms and the privacy laws that apply to that company.

If you don't know whether or not your scraper is legal, you need to consult an attorney.


Conclusion

Now that you've finished this tutorial, you know how to scrape Google Maps. You have a good understanding of Requests and BeautifulSoup. You should also have a solid understanding of parsing, data storage, concurrency and proxy integration. Go take these skills and build something cool!

If you'd like to learn more about the tech used in this article, use the links below.


More Python Web Scraping Guides

At ScrapeOps, we've always got learning materials for you. If you'd like to see some other topics that use Requests and BeautifulSoup, check out our Python Web Scraping Playbook.

If you'd like to learn more from our "How To Scrape" series, check out the links below.