Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file.{"api_key": "your-super-secret-api-key"}
.python name_of_your_python_file.py
.import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[role='main']") info_cards = soup.find_all("div", class_="MyEned") review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") for card in info_cards: review = card.text full_card = card.parent.parent.parent.parent reviewer_button = full_card.find("button") name = reviewer_button.get("aria-label").replace("Photo of ", "") rating_tag = full_card.select_one("span[role='img']") stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", "")) review_date = rating_tag.parent.find_all("span")[-1].text review_data = ReviewData( name=name, stars=stars, time_left=review_date, review_shortened=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: the max amount of retries for a parse.MAX_THREADS
: how many threads you'd like to use when parsing pages simultaneously.LOCATION
: the location you'd like to appear from.LOCALITIES
: the areas of the map you'd like to scrape. They need to be added in as latitude and longitude pairs.keyword_list
: the keywords you'd like to search the map for.name
, stars
, url
, and rating_count
. Then, we'll save these to a CSV file. Then, our review scraper will go through and find reviews for each of these businesses.Our Maps crawler will need to do the following:https://www.google.com/maps/place/Leo's+Coney+Island/@42.3937072,-83.4828338,17z/data=!4m6!3m5!1s0x8824acedc1b6f397:0xaa85d06de541a352!8m2!3d42.3937072!4d-83.4828338!16s%2Fg%2F1tf299fd?authuser=0&hl=en&entry=ttu&g_ep=EgoyMDI0MDkwOC4wIKXMDSoASAFQAw%3D%3D`
@42.3937072,-83.4828338
is our latitude and longitude.a
tag with a link to the restaurant information.div
with a class of MyEned
. Once we find this element, we can find its parent
elements. Once we've found the correct parent
element, we can find all of the other information we need.@42.3937072,-83.4828338
.
country
param with the ScrapeOps Proxy Aggregator.
{"country": "us"}
to ScrapeOps.mkdir google-reviews-scraper cd google-reviews-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
scrape_search_results()
.Pay close attention to the parsing logic going on in this script.import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = { "name": name, "stars": rating, "url": maps_link, "rating_count": rating_count } print(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, retries=3): for locality in localities: scrape_search_results(keyword, location, locality, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, LOCATION, LOCALITIES, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
business_links = soup.select("div div a")
.business_link.get("aria-label")
.business_link.get("href")
gives us the link to each business.parent
element of the business link, full_card = business_link.parent
.full_card.select_one("span[role='img']")
finds our rating holder.dataclass
to represent our search results. Then, we need a pipeline to a CSV.dataclass
. We'll call it SearchData
.@dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. This class opens a pipe to a CSV file and filters out duplicates using their name
attribute.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
DataPipeline
and pass it into start_scrape()
. It then gets passed into scrape_search_results()
. Instead of finding and printing our data as a dict
object, we create a SearchData
object and pass it into our DataPipeline
.import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, retries=3): for locality in localities: scrape_search_results(keyword, location, locality, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData
is used to represent individual search results from our crawl.DataPipeline
is used to pipe all of our SearchData
objects to a CSV file and remove the duplicates.start_scrape()
already allows us to crawl a list of different localities.To crawl this list concurrently, we just need to refactor start_scrape()
and replace the for
loop with something a little more powerful. We'll do this using ThreadPoolExecutor
. This opens up a new pool of threads and runs our parsing function on each thread concurrently.Here is our old version of start_scrape()
.def start_scrape(keyword, location, localities, data_pipeline=None, retries=3): for locality in localities: scrape_search_results(keyword, location, locality, data_pipeline=data_pipeline, retries=retries)
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) )
exectutor.map()
is the portion that actually replaces the for
loop. Take a look at the args:scrape_search_results
: the function we want to call.[keyword] * len(localities)
: our keyword passed in as a list.[location] * len(localities)
: our location passed in as a list.localities
: the list of localities we'd like to crawl.[data_pipeline] * len(localities)
: our DataPipeline
object passed in as a list.[retries] * len(localities)
: our retry limit passed in as a list.executor.map()
takes these lists and passes them into a bunch of separate instances of our parsing function.wait
for our content to render.We need to tell ScrapeOps Proxy Aggregator the following four things when making our requests:"api_key"
: your ScrapeOps API key."url"
: the url we want to scrape."country"
: the country we want our request to be routed through. This parameter uses a location of our choice when we make the request."wait"
: how long to wait before sending our response. This allows the content to render on their end before we get it back.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
MAX_RETRIES
: the max amount of retries for a parse.MAX_THREADS
: how many threads you'd like to use when parsing pages simultaneously.LOCATION
: the location you'd like to appear from.LOCALITIES
: the areas of the map you'd like to scrape. They need to be added in as latitude and longitude pairs.keyword_list
: the keywords you'd like to search the map for.def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[role='main']") info_cards = soup.find_all("div", class_="MyEned") for card in info_cards: review = card.text full_card = card.parent.parent.parent.parent reviewer_button = full_card.find("button") name = reviewer_button.get("aria-label").replace("Photo of ", "") rating_tag = full_card.select_one("span[role='img']") stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", "")) review_date = rating_tag.parent.find_all("span")[-1].text review_data = { "name": name, "stars": stars, "time_left": review_date, "review_shortened": review } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
info_card
items: info_cards = soup.find_all("div", class_="MyEned")
.review = card.text
.parent
attribute to find the full review card that includes the reviewer name and rating: full_card = card.parent.parent.parent.parent
.reviewer_button = full_card.find("button")
finds the button that holds information about our reviewer.aria-label
attribute: name = reviewer_button.get("aria-label").replace("Photo of ", "")
. We also remove "Photo of "
from the string that includes their name, this way, the only information we're saving is the reviewer name.int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", ""))
.review_date = rating_tag.parent.find_all("span")[-1].text
finds all the span
tags descended from the parent
of our rating_tag
. The last element is our review date, so we pull index -1
from the array.start_scrape()
. This one needs to read our CSV file into an array of dict
objects.Then, it should iterate through the array and call our parsing function on each row we read from the file.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries)
import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[role='main']") info_cards = soup.find_all("div", class_="MyEned") for card in info_cards: review = card.text full_card = card.parent.parent.parent.parent reviewer_button = full_card.find("button") name = reviewer_button.get("aria-label").replace("Photo of ", "") rating_tag = full_card.select_one("span[role='img']") stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", "")) review_date = rating_tag.parent.find_all("span")[-1].text review_data = { "name": name, "stars": stars, "time_left": review_date, "review_shortened": review } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline
class. This makes our new storage really easy to implement. We just need to pass a dataclass
into a DataPipeline
. This new class will be used to represent reviews from the page.Take a look at ReviewData
, it's almost identical to SearchData
.@dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
from inside our parsing function. Then, as we extract our data, we convert it into ReviewData
. That ReviewData
then gets passed into the DataPipeline
as we parse it.import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[role='main']") info_cards = soup.find_all("div", class_="MyEned") review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") for card in info_cards: review = card.text full_card = card.parent.parent.parent.parent reviewer_button = full_card.find("button") name = reviewer_button.get("aria-label").replace("Photo of ", "") rating_tag = full_card.select_one("span[role='img']") stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", "")) review_date = rating_tag.parent.find_all("span")[-1].text review_data = ReviewData( name=name, stars=stars, time_left=review_date, review_shortened=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
ThreadPoolExecutor
just like we did before. We'll replace the for
loop in process_results()
with some more powerful, multithreaded code.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
process_business
is the function we want to call on all threads.response = requests.get(get_scrapeops_url(url, location=location))
import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[role='main']") info_cards = soup.find_all("div", class_="MyEned") review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") for card in info_cards: review = card.text full_card = card.parent.parent.parent.parent reviewer_button = full_card.find("button") name = reviewer_button.get("aria-label").replace("Photo of ", "") rating_tag = full_card.select_one("span[role='img']") stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", "")) review_date = rating_tag.parent.find_all("span")[-1].text review_data = ReviewData( name=name, stars=stars, time_left=review_date, review_shortened=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
again below.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
. Violating these can lead to suspension and even deletion of your account. You can view these documents from Google Maps below.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your ScrapeOps API key.import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(get_scrapeops_url(url)) info_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='MyEned']") review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") for card in info_cards: review = card.find_element(By.CSS_SELECTOR, "span").get_attribute("innerHTML") full_card = card.find_element(By.XPATH, "../../../..") reviewer_button = full_card.find_element(By.CSS_SELECTOR, "button") name = reviewer_button.get_attribute("aria-label").replace("Photo of ", "") rating_tag = full_card.find_element(By.CSS_SELECTOR, "span[role='img']") stars = int(rating_tag.get_attribute("aria-label").replace(" stars", "").replace(" star", "")) rating_parent = rating_tag.find_element(By.XPATH, "..") review_date = rating_parent.find_elements(By.CSS_SELECTOR, "span")[-1].get_attribute("innerHTML") review_data = ReviewData( name=name, stars=stars, time_left=review_date, review_shortened=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: Defines the maximum number of attempts to retry a failed request during scraping or processing.MAX_THREADS
: Specifies the maximum number of threads for concurrent processing of tasks.LOCATION
: Determines the geographical location for the scraping requests.LOCALITIES
: Provides the geographic coordinates or specific areas to focus the Google Maps search.keyword_list
: Contains the list of keywords to search for on Google Maps (e.g., types of businesses such as "restaurants" or "cafes").https://www.google.com/maps/search/restaurants/@42.3753166,-83.4750232,15z/data=!3m1!4b1?entry=ttu
search
endpoint. Our target location gets added on to the keyword endpoint. When our scraper constructs its URLs, they'll be laid out like this:https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu
driver.get()
.As you can see, the popup contains a Reviews tab. This is where our individual reviews will come from.a
tags descended from two div
objects.div div a
.div
with the class of MyEned
. Our review selector will look like this: div[class='MyEned']
.us
into our country
param.You can view a list of country codes below.Country | Country Code |
---|---|
Brazil | br |
Canada | ca |
China | cn |
India | in |
Italy | it |
Japan | jp |
France | fr |
Germany | de |
Russia | ru |
Spain | es |
United States | us |
United Kingdom | uk |
cd
into the folder.mkdir google-reviews-seleniumcd google-reviews-selenium
python -m venv venv
source venv/bin/activate
pip install selenium
start_scrape()
is used to invoke our actual parsing function, scrape_search_results()
.We have some basic retry logic for the parser and some configuration variables inside of our main
. The main
holds the actual runtime of the programimport osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) response = driver.get(url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = { "name": name, "stars": rating, "url": maps_link, "rating_count": rating_count } print(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, retries=3): for locality in localities: scrape_search_results(keyword, location, locality, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, LOCATION, LOCALITIES, retries=MAX_RETRIES) aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
scrape_search_results()
, you'll see the initial design we talked about in the understanding section.driver.find_elements(By.CSS_SELECTOR, "div div a")
finds and returns all the a
elements descended from two div
elements on the page. This is one of the selectors we wrote earlier.get_attribute()
to extract each restaurant name from the aria-label
attribute.href
.full_card
containing all of our data, we need to find the parent element of the business link. We use its XPath to do this: business_link.find_element(By.XPATH, "..")
.span
elements descended from another span
with the role
: img
. Here is our selector: span[role='img'] > span
. If there are ratings present, we extract both the rating
and rating_count
using the innerHTML
from these items. With Selenium, the text
method only reliably shows text that's displayed on the page. Instead of scrolling and waiting for this information to populate, we simply pull it from the HTML immediately.dict
object. dict
is great for protoyping, but it doesn't always cover edge cases due to weak typing. dict
allows for things like missing fields that could potentially corrupt our data.To handle this, we'll create a SearchData
class. Then, we need a pipeline to pass all of these objects into our CSV file. For this, we'll create a DataPipeline
class.Take a look at SearchData
. Nothing fancy, just the fields we extracted earlier coupled with a check_string_fields()
method to ensure that we don't have any missing fields.@dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. This class actually does most of the heavy lifting. It comes with a variety of methods, all of which are very important to our actual storage.To summarize it, our SearchData
gets held inside the storage_queue
. We can add new objects to the queue using the add_data()
method. When close_pipeline()
is invoked, the entire queue gets saved to a CSV file with save_to_csv()
.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) response = driver.get(url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, retries=3): for locality in localities: scrape_search_results(keyword, location, locality, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
start_scrape()
is used to trigger our parser on an array of localities using a for
loop. This gets the job done, but it processes these localities one at a time. Let's rewrite it using ThreadPoolExecutor
.Our new version opens a pool of threads and runs scrape_search_results
on each thread simultaneously. It may look intimidating, but this function is much simpler than you might think.We pass lists of args into ThreadPoolExecutor
and it then takes each arg from the list and passes it into an indiviudal instance of our target function.def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) )
scrape_search_results
: The function we wish to run multiple instances of.[keyword] * len(localities)
, [location] * len(localities)
, [data_pipeline] * len(localities)
, [retries] * len(localities)
: All of these get passed in as arrays the length of our localities
list.localities
: The list we actually want to process.import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) response = driver.get(url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
api_key
, url
, and country
).In this particular case, we need to add another parameter as well: wait
. wait
tells Proxy Aggregator to wait an arbitrary amount of time for content to render. We then use URL encoding to wrap all of these parameters into a ScrapeOps proxied url.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
below. Feel free to change the configuration variables to better fit your results.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) info_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='MyEned']") for card in info_cards: review = card.find_element(By.CSS_SELECTOR, "span").get_attribute("innerHTML") full_card = card.find_element(By.XPATH, "../../../..") reviewer_button = full_card.find_element(By.CSS_SELECTOR, "button") name = reviewer_button.get_attribute("aria-label").replace("Photo of ", "") rating_tag = full_card.find_element(By.CSS_SELECTOR, "span[role='img']") stars = int(rating_tag.get_attribute("aria-label").replace(" stars", "").replace(" star", "")) rating_parent = rating_tag.find_element(By.XPATH, "..") review_date = rating_parent.find_elements(By.CSS_SELECTOR, "span")[-1].get_attribute("innerHTML") review_data = { "name": name, "stars": stars, "time_left": review_date, "review_shortened": review } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
card.find_element(By.CSS_SELECTOR, "span").get_attribute("innerHTML")
finds our actual review. Since its not visible on the page, we need to extract its innerHTML
instead its text.card.find_element(By.XPATH, "../../../..")
finds the full element holding our review. It's the parent of the parent of the parent of the parent of our card
element... The "great great grandparent" if you will.button
element and extract the reviewer's name from its aria-label
.span
elements containing the rating
and rating_count
.aria-label
of its holder element.rating_count
from the text of its holder as well. Since the element isn't visible on the page, we once again use its innerHTML
as opposed to its actual text
.process_results()
will be used to trigger our scraping function.This function opens the CSV file and reads the rows into an array of dict
objects. Each row then gets passed into process_business()
.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries)
import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) info_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='MyEned']") for card in info_cards: review = card.find_element(By.CSS_SELECTOR, "span").get_attribute("innerHTML") full_card = card.find_element(By.XPATH, "../../../..") reviewer_button = full_card.find_element(By.CSS_SELECTOR, "button") name = reviewer_button.get_attribute("aria-label").replace("Photo of ", "") rating_tag = full_card.find_element(By.CSS_SELECTOR, "span[role='img']") stars = int(rating_tag.get_attribute("aria-label").replace(" stars", "").replace(" star", "")) rating_parent = rating_tag.find_element(By.XPATH, "..") review_date = rating_parent.find_elements(By.CSS_SELECTOR, "span")[-1].get_attribute("innerHTML") review_data = { "name": name, "stars": stars, "time_left": review_date, "review_shortened": review } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline
class that takes in dataclass
objects. Once again, we need to convert our extracted dict
into a more strongly typed object.In the snippet below, we create a ReviewData
object. It uses the same methods for dealing with bad data, we just have some different fields: name
, stars
, time_left
, and review_shortened
.time_left
: The time the review was left. Google doesn't always give us specific dates, sometimes they just give us a general time frame such as 2 months ago
.review_shortened
: This is the shortened version of the review that gets displayed when we look at the business popup.@dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
process_results()
. We then use our extracted data to create a ReviewData
object. This object then gets passed into the pipeline. Once we've parsed the reviews, we close the pipeline.import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) info_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='MyEned']") review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") for card in info_cards: review = card.find_element(By.CSS_SELECTOR, "span").get_attribute("innerHTML") full_card = card.find_element(By.XPATH, "../../../..") reviewer_button = full_card.find_element(By.CSS_SELECTOR, "button") name = reviewer_button.get_attribute("aria-label").replace("Photo of ", "") rating_tag = full_card.find_element(By.CSS_SELECTOR, "span[role='img']") stars = int(rating_tag.get_attribute("aria-label").replace(" stars", "").replace(" star", "")) rating_parent = rating_tag.find_element(By.XPATH, "..") review_date = rating_parent.find_elements(By.CSS_SELECTOR, "span")[-1].get_attribute("innerHTML") review_data = ReviewData( name=name, stars=stars, time_left=review_date, review_shortened=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
ThreadPoolExecutor
to run our parsing function on multiple threads.Take a look at our rewritten version below. Just like earlier, our first argument is the function we want to call: process_business
. Next, we pass in our CSV file: reader
. All other args once again get passed in as arrays the length of the list we want to process.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
driver.get()
line of the parsing function. This single change finishes all of our coding for this project.driver.get(get_scrapeops_url(url))
import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(get_scrapeops_url(url)) info_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='MyEned']") review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") for card in info_cards: review = card.find_element(By.CSS_SELECTOR, "span").get_attribute("innerHTML") full_card = card.find_element(By.XPATH, "../../../..") reviewer_button = full_card.find_element(By.CSS_SELECTOR, "button") name = reviewer_button.get_attribute("aria-label").replace("Photo of ", "") rating_tag = full_card.find_element(By.CSS_SELECTOR, "span[role='img']") stars = int(rating_tag.get_attribute("aria-label").replace(" stars", "").replace(" star", "")) rating_parent = rating_tag.find_element(By.XPATH, "..") review_date = rating_parent.find_elements(By.CSS_SELECTOR, "span")[-1].get_attribute("innerHTML") review_data = ReviewData( name=name, stars=stars, time_left=review_date, review_shortened=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
. As always, feel free to change any of the config variables.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
wait
parameter, this is just about as fast as it can get.