How to Scrape Google Maps With Requests and BeautifulSoup
Google Maps is pretty much the central hub if you're looking for restaurants or other businesses in any location. When we scrape Google Maps, we have access to a wealth of data. We can get the location, rating, and hours of a business... depending on how much you're willing to dig, you can get even more information than this!
In this article, we'll learn how to build a scraper to collect data from Google Maps.
- TLDR: How to Scrape Google Maps
- How To Architect Our Scraper
- Understanding How To Scrape Google Maps
- Setting Up Our Google Maps Scraper
- Build A Google Maps Search Crawler
- Build A Google Maps Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
TLDR - How to Scrape Google Maps
If you're looking for a Google Maps scraper, look no further. The code below does exactly that.
- Just create a new project folder and install your dependencies.
- Then create a
config.json
file with your ScrapeOps API keys. - Copy and paste this code into a Python file and run it!
To run your script, use the following command.
python name_of_your_script.py
Here is the code for you to copy and paste.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class BusinessData:
name: str = ""
street_address: str = ""
city: str = ""
state_and_zip: str = ""
sunday: str = ""
monday: str = ""
tuesday: str = ""
wednesday: str = ""
thursday: str = ""
friday: str = ""
saturday: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
business_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
info_cards = soup.find_all("div")
for card in info_cards:
aria_label = card.get("aria-label")
if not aria_label:
continue
if "Information" not in aria_label:
continue
print("card exists")
button = card.find("button")
address = button.text.replace("", "")
address_array = address.split(",")
street_address = address_array[0]
city = address_array[1]
state_and_zip = address_array[2]
sunday = ""
monday = ""
tuesday = ""
wednesday = ""
thursday = ""
friday = ""
saturday = ""
hours_cards = card.find_all("tr")
for card in hours_cards:
row_text = card.text
if "Sunday" in row_text:
sunday = row_text.replace("Sunday", "")
elif "Monday" in row_text:
monday = row_text.replace("Monday", "")
elif "Tuesday" in row_text:
tuesday = row_text.replace("Tuesday", "")
elif "Wednesday" in row_text:
wednesday = row_text.replace("Wednesday", "")
elif "Thursday" in row_text:
thursday = row_text.replace("Thursday", "")
elif "Friday" in row_text:
friday = row_text.replace("Friday", "")
elif "Saturday" in row_text:
saturday = row_text.replace("Saturday", "")
else:
continue
business_data = BusinessData(
name=row["name"],
street_address=street_address,
city=city,
state_and_zip=state_and_zip,
sunday=sunday,
monday=monday,
tuesday=tuesday,
wednesday=wednesday,
thursday=thursday,
friday=friday,
saturday=saturday
)
business_pipeline.add_data(business_data)
break
business_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
To change your results, feel free to change any of the following constants:
MAX_RETRIES
: the max amount of retries for a parse.MAX_THREADS
: how many threads you'd like to use when parsing pages simultaneously.LOCATION
: the location you'd like to appear from.LOCALITIES
: the areas of the map you'd like to scrape. They need to be added in as latitude and longitude pairs.keyword_list
: the keywords you'd like to search the map for, in this instancerestaurant
.
How To Architect Our Google Maps Scraper
Our project is going to consist of two different scrapers.
1.Our first one will be a search crawler. The crawler has a pretty simple purpose: perform a search and extract the results into a CSV file.
2. Our scraper will then read the CSV and scrape the individual businesses inside it.
Here is the process for our crawler:
- Perform a search and parse the results.
- Store the results in a CSV file.
- Concurrently run steps one and 2 on separate localities.
- Integrate with a proxy to bypass anti-bots and anything else that might block us.
The scraper process is here:
- Read the businesses from the CSV.
- Iterate through the businesses and parse the results.
- Store the parsed data for each individual business.
- Run steps 2 and 3 concurrently for speed and efficiency.
- Again, integrate with a proxy to avoid getting blocked.
Understanding How To Scrape Google Maps
Before we build our crawler and scraper, we need to get a better understanding of this data from a high level.
In the sections below, we're going to go through Google Maps search pages and the individual business pages from Google maps. This way, we know where our data is located and how we'd like to extract it.
Step 1: How To Request Google Maps Pages
Here is the URL for a Google Maps search for restaurants:
https://www.google.com/maps/search/Restaurants/@42.3753166,-83.4750232,15z/data=!3m1!4b1?entry=ttu
42.3753166,-83.4750232
represents the area we're searching.42.3753166
is our latitude and-83.4750232
is our longitude.- So in order to search a specific area, you need to get the latitude and longitude of that area.
/Restaurants
tells Google that we'd like to search for restaurants.
Take a look at the image below to see the URL.
Step 2: How To Extract Data From Google Maps Results and Pages
To extract data from our maps search, we're going to pull the link elments out of the page. Since all the data is generated dynamically, we'll also need to use the wait
parameter with the ScrapeOps Proxy API.
If you look at the image below, you can see a highlighted a
element. Within that element, the business name is embedded as the aria-label
.
To extract data from individual business pages, we follow a pretty similar strategy.
- All of our relevant information is located inside one specific
div
card. - All business card have an
aria-label
that begins with the word "Information" followed by the name of the business. - To get these, we'll first find all the
divs
and then filter through them until we find one with anaria-label
containing the word"Information"
.
You can see this card highlighted below.
Step 3: Geolocated Data
To handle our geolocation, we'll be using two things.
- We'll pass a
country
param into the ScrapeOps API. This will route us through servers in a country of our choosing. - We'll also create a
locality
parameter when building our url. Thislocality
will be made from the latitude and longitude points you saw earlier.
Setting Up Our Google Maps Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir google-maps-scraper
cd google-maps-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A Google Maps Search Crawler
Now that we know what we want to do, it's time to express it using code. We'll start by building a parser. Then, we'll add data storage, concurrency, and proxy integration.
If you follow the steps below, you'll be able to build a Google Maps crawler from scratch.
Step 1: Create Simple Search Data Parser
We're going to start with a basic parser. We'll add our imports, parsing functionality, retry logic and error handling.
While the basic structure is important, what you should really pay attention to here is the parsing logic.
In the code below, we find all a
elements descended from at least two div
elements. Then, we use some logic to filter out the links we don't want.
After we've found a target link, we find its parent element and pull the rest of our data from the parent element.
Here is the code we'll start with.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, locality, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = {
"name": name,
"stars": rating,
"url": maps_link,
"rating_count": rating_count
}
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, retries=3):
for locality in localities:
scrape_search_results(keyword, location, locality, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, LOCATION, LOCALITIES, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
- Our url is constructed like so:
https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu
.formatted_keyword
is the search we're performing.locality
holds the latitude and longitude coordinates for the area we'd like to scrape.
- We then request the url and if the operation hasn't succeeded:
soup.select("div div a")
finds all of our link elements.- We use a simple
if
andcontinue
statement to filter out unwanted links. business_link.get("aria-label")
pulls the name of the business.business_link.get("href")
finds the link to the site.- We set our defaults of
rating
andrating_count
to zero. - If ratings are present, we pull the
rating
andrating_count
with some string splitting and save them to their respective variables.
Step 2: Storing the Scraped Data
Now that we're extracting the proper data, we need to save that data to a CSV file. This way, people can read the data and so can other programs. By storing this data, we can review the file later and our scraper will be able to read the file later on when we build it.
We're going to start by building a dataclass
, SearchData
. This class will simply hold data that needs to be stored.
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
We also need something to take in a dataclass
and store it. This is where our DataPipeline
comes into play. This class opens a pipe to a CSV file and filters out duplicates.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
With these classes added in, our full code now looks like this.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, retries=3):
for locality in localities:
scrape_search_results(keyword, location, locality, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- Before starting our scrape, we open a
DataPipeline
,crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
. - We then pass the pipeline into
start_scrape()
which in turn passes it intoscrape_search_results()
. - Instead of printing our extracted data, we use it to make a
SearchData
object and pass it into the pipeline. - Once the crawl has finished, we close the pipeline with
crawl_pipeline.close_pipeline()
.
Step 3: Adding Concurrency
start_scrape()
already gives us the ability to scrape a list of localities
. Instead of scraping them one at a time with a for
loop, we should be scraping as many as possible at one time. This is where concurrency comes into play. ü
We're going to refactor start_scrape()
to use multithreading instead of a for
loop.
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)
Pay close attention to the arguments we pass into executor.map()
:
scrape_search_results
is what we want to do on all the available threads.- All subsequent arguments are going to get passed into
scrape_search_results
. localities
is the list of areas we'd like to search.- All other arguments get passed in as arrays the length of
localities
.
Here is our fully updated code.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Our crawler now has the ability to scrape multiple localities at the same time.
Step 4: Bypassing Anti-Bots
Before we do our production run, we need to add proxy integration. This will allow us to bypass anti-bots and anything else that might block our scraper.
Take a look at the function below. While it's similar to other proxy functions you'll find in other ScrapeOps guides, there are a few key differences.
We add some additional arguments like wait
and residential
.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
"api_key"
: holds your ScrapeOps API key."url"
: is the url we want to scrape."country"
: is the location we'd like to be routed through."wait": 5000
: tells the ScrapeOps server that we want it to wait 5 seconds for content to render before sending our response back."residential": True
tells ScrapeOps that we want to use a residential IP address. This greatly decreases our likelihood of getting blocked.
Here is our finalized code for the crawler.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 5: Production Run
Our crawler is finished and it's time to test it out in production. We'll be scraping three different localities
. If you want to tweak your results, go ahead and change any of the constants from the main
below.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Here are the final results.
It took us 11.779 seconds to crawl 3 separate localities and generate a file with 24 search results... even with the 5 second wait on each page. This comes out to roughly 3.93 seconds per locality.
Build A Google Maps Scraper
Now that we have a functional crawler, we need a functional scraper. Our scraper needs to read the CSV from earlier. Then it needs to parse each individual business and store its data.
Step 1: Create Simple Business Data Parser
Like before, we're going to start with a basic data parser. First, we find all the div
objects. The we iterate through them until we've found the div
that holds the business info.
Once we've found that card, we pull the address and business hours from it.
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
info_cards = soup.find_all("div")
for card in info_cards:
aria_label = card.get("aria-label")
if not aria_label:
continue
if "Information" not in aria_label:
continue
print("card exists")
button = card.find("button")
address = button.text.replace("", "")
address_array = address.split(",")
street_address = address_array[0]
city = address_array[1]
state_and_zip = address_array[2]
sunday = ""
monday = ""
tuesday = ""
wednesday = ""
thursday = ""
friday = ""
saturday = ""
hours_cards = card.find_all("tr")
for card in hours_cards:
row_text = card.text
if "Sunday" in row_text:
sunday = row_text.replace("Sunday", "")
elif "Monday" in row_text:
monday = row_text.replace("Monday", "")
elif "Tuesday" in row_text:
tuesday = row_text.replace("Tuesday", "")
elif "Wednesday" in row_text:
wednesday = row_text.replace("Wednesday", "")
elif "Thursday" in row_text:
thursday = row_text.replace("Thursday", "")
elif "Friday" in row_text:
friday = row_text.replace("Friday", "")
elif "Saturday" in row_text:
saturday = row_text.replace("Saturday", "")
else:
continue
business_data = {
"name": row["name"],
"street_address": street_address,
"city": city,
"state_and_zip": state_and_zip,
"sunday": sunday,
"monday": monday,
"tuesday": tuesday,
"wednesday": wednesday,
"thursday": thursday,
"friday": friday,
"saturday": saturday
}
print(business_data)
break
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
if not aria_label
is used to filter out anydiv
cards that don't hold thearia-label
attribute.if "Information" not in aria_label
is used to skip anydiv
cards that have the wrongaria-label
.- Once we've found our target card, we pull the address info from a
button
element and then use some string splitting to separate different pieces of the address:street_address
,city
,state_and_zip
. - We name a variable after each day of the week and give it an empty string default value.
- We find all the
tr
elements and assign the daily hours to their corresponding variables.
Step 2: Loading URLs To Scrape
To use our parser, we need to give it businesses to process. To feed it a list of businesses, we have to read the CSV generated by our crawler. Here, process_results()
does exactly that.
It reads the CSV file, and then it runs process_business()
on each and every one of those results.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
Here is our fully updated code.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
info_cards = soup.find_all("div")
for card in info_cards:
aria_label = card.get("aria-label")
if not aria_label:
continue
if "Information" not in aria_label:
continue
print("card exists")
button = card.find("button")
address = button.text.replace("", "")
address_array = address.split(",")
street_address = address_array[0]
city = address_array[1]
state_and_zip = address_array[2]
sunday = ""
monday = ""
tuesday = ""
wednesday = ""
thursday = ""
friday = ""
saturday = ""
hours_cards = card.find_all("tr")
for card in hours_cards:
row_text = card.text
if "Sunday" in row_text:
sunday = row_text.replace("Sunday", "")
elif "Monday" in row_text:
monday = row_text.replace("Monday", "")
elif "Tuesday" in row_text:
tuesday = row_text.replace("Tuesday", "")
elif "Wednesday" in row_text:
wednesday = row_text.replace("Wednesday", "")
elif "Thursday" in row_text:
thursday = row_text.replace("Thursday", "")
elif "Friday" in row_text:
friday = row_text.replace("Friday", "")
elif "Saturday" in row_text:
saturday = row_text.replace("Saturday", "")
else:
continue
business_data = {
"name": row["name"],
"street_address": street_address,
"city": city,
"state_and_zip": state_and_zip,
"sunday": sunday,
"monday": monday,
"tuesday": tuesday,
"wednesday": wednesday,
"thursday": thursday,
"friday": friday,
"saturday": saturday
}
print(business_data)
break
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
We need to add another dataclass
. We'll call this one BusinessData
. This class will hold the business's address data and hours. After we instantiate a BusinessData
object, we'll need to pass it into a DataPipeline
like we did before.
Here is our BusinessData
class.
@dataclass
class BusinessData:
name: str = ""
street_address: str = ""
city: str = ""
state_and_zip: str = ""
sunday: str = ""
monday: str = ""
tuesday: str = ""
wednesday: str = ""
thursday: str = ""
friday: str = ""
saturday: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In our updated code below, we create another DataPipeline
from inside our parsing function. We then create a BusinessData
object to pass into it and close the pipeline once we've stored the data.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class BusinessData:
name: str = ""
street_address: str = ""
city: str = ""
state_and_zip: str = ""
sunday: str = ""
monday: str = ""
tuesday: str = ""
wednesday: str = ""
thursday: str = ""
friday: str = ""
saturday: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
business_links = soup.select("div div a")
excluded_words = ["Sign in"]
for business_link in business_links:
name = business_link.get("aria-label")
if not name or name in excluded_words or "Visit" in name:
continue
maps_link = business_link.get("href")
full_card = business_link.parent
rating_holder = full_card.select_one("span[role='img']")
rating = 0.0
rating_count = 0
if rating_holder:
rating_array = rating_holder.text.split("(")
rating = rating_array[0]
rating_count = int(rating_array[1].replace(")", "").replace(",", ""))
search_data = SearchData(
name=name,
stars=rating,
url=maps_link,
rating_count=rating_count
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * len(localities),
[location] * len(localities),
localities,
[data_pipeline] * len(localities),
[retries] * len(localities)
)
def process_business(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
business_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
info_cards = soup.find_all("div")
for card in info_cards:
aria_label = card.get("aria-label")
if not aria_label:
continue
if "Information" not in aria_label:
continue
print("card exists")
button = card.find("button")
address = button.text.replace("", "")
address_array = address.split(",")
street_address = address_array[0]
city = address_array[1]
state_and_zip = address_array[2]
sunday = ""
monday = ""
tuesday = ""
wednesday = ""
thursday = ""
friday = ""
saturday = ""
hours_cards = card.find_all("tr")
for card in hours_cards:
row_text = card.text
if "Sunday" in row_text:
sunday = row_text.replace("Sunday", "")
elif "Monday" in row_text:
monday = row_text.replace("Monday", "")
elif "Tuesday" in row_text:
tuesday = row_text.replace("Tuesday", "")
elif "Wednesday" in row_text:
wednesday = row_text.replace("Wednesday", "")
elif "Thursday" in row_text:
thursday = row_text.replace("Thursday", "")
elif "Friday" in row_text:
friday = row_text.replace("Friday", "")
elif "Saturday" in row_text:
saturday = row_text.replace("Saturday", "")
else:
continue
business_data = BusinessData(
name=row["name"],
street_address=street_address,
city=city,
state_and_zip=state_and_zip,
sunday=sunday,
monday=monday,
tuesday=tuesday,
wednesday=wednesday,
thursday=thursday,
friday=friday,
saturday=saturday
)
business_pipeline.add_data(business_data)
break
business_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"]
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["restaurant"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 4: Adding Concurrency
Once again, we'll use ThreadPoolExecutor
to replace a for
loop. Just like before, our first arg is the function we'd like to call. Each argument after is a list that gets passed into process_business
.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
process_business
is the function we want to call on open threads.- All arguments to
process_business
get passed in as lists.
Step 5: Bypassing Anti-Bots
We already have a function that gives us proxy integration, we just need to use it again. In order to do this, we'll change one final line of the parsing function.
response = requests.get(get_scrapeops_url(url, location=location))
Here is our finalized code containing both the crawler and the scraper.
import os
import re
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
rating_count: int = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class BusinessData:
name: str = ""
street_address: str = ""
city: str = ""
state_and_zip: str = ""
sunday: str = ""
monday: str = ""
tuesday: str = ""
wednesday: str = ""
thursday: str = ""
friday: str = ""
saturday: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url