Then check out ScrapeOps, the complete toolkit for web scraping.
config.json file.{"api_key": "your-super-secret-api-key"}.python name_of_your_python_file.py.import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[role='main']") info_cards = soup.find_all("div", class_="MyEned") review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") for card in info_cards: review = card.text full_card = card.parent.parent.parent.parent reviewer_button = full_card.find("button") name = reviewer_button.get("aria-label").replace("Photo of ", "") rating_tag = full_card.select_one("span[role='img']") stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", "")) review_date = rating_tag.parent.find_all("span")[-1].text review_data = ReviewData( name=name, stars=stars, time_left=review_date, review_shortened=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES: the max amount of retries for a parse.MAX_THREADS: how many threads you'd like to use when parsing pages simultaneously.LOCATION: the location you'd like to appear from.LOCALITIES: the areas of the map you'd like to scrape. They need to be added in as latitude and longitude pairs.keyword_list: the keywords you'd like to search the map for.name, stars, url, and rating_count. Then, we'll save these to a CSV file. Then, our review scraper will go through and find reviews for each of these businesses.Our Maps crawler will need to do the following:https://www.google.com/maps/place/Leo's+Coney+Island/@42.3937072,-83.4828338,17z/data=!4m6!3m5!1s0x8824acedc1b6f397:0xaa85d06de541a352!8m2!3d42.3937072!4d-83.4828338!16s%2Fg%2F1tf299fd?authuser=0&hl=en&entry=ttu&g_ep=EgoyMDI0MDkwOC4wIKXMDSoASAFQAw%3D%3D`
@42.3937072,-83.4828338 is our latitude and longitude.When we lookup a specific restaurant, we get a super similar page. We get our map, and along with it, we get a section of the page containing the business information and reviews.a tag with a link to the restaurant information.On the individual business page, the actual reviews are embedded within a div with a class of MyEned. Once we find this element, we can find its parent elements. Once we've found the correct parent element, we can find all of the other information we need.@42.3937072,-83.4828338.
country param with the ScrapeOps Proxy Aggregator.
{"country": "us"} to ScrapeOps.mkdir google-reviews-scraper cd google-reviews-scraper
python -m venv venvsource venv/bin/activatepip install requestspip install beautifulsoup4scrape_search_results().Pay close attention to the parsing logic going on in this script.import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = { "name": name, "stars": rating, "url": maps_link, "rating_count": rating_count } print(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, retries=3): for locality in localities: scrape_search_results(keyword, location, locality, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, LOCATION, LOCALITIES, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
business_links = soup.select("div div a").business_link.get("aria-label").business_link.get("href") gives us the link to each business.parent element of the business link, full_card = business_link.parent.full_card.select_one("span[role='img']") finds our rating holder.dataclass to represent our search results. Then, we need a pipeline to a CSV.dataclass. We'll call it SearchData.@dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline. This class opens a pipe to a CSV file and filters out duplicates using their name attribute.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
DataPipeline and pass it into start_scrape(). It then gets passed into scrape_search_results(). Instead of finding and printing our data as a dict object, we create a SearchData object and pass it into our DataPipeline.import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, retries=3): for locality in localities: scrape_search_results(keyword, location, locality, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData is used to represent individual search results from our crawl.DataPipeline is used to pipe all of our SearchData objects to a CSV file and remove the duplicates.start_scrape() already allows us to crawl a list of different localities.To crawl this list concurrently, we just need to refactor start_scrape() and replace the for loop with something a little more powerful. We'll do this using ThreadPoolExecutor. This opens up a new pool of threads and runs our parsing function on each thread concurrently.Here is our old version of start_scrape().def start_scrape(keyword, location, localities, data_pipeline=None, retries=3): for locality in localities: scrape_search_results(keyword, location, locality, data_pipeline=data_pipeline, retries=retries)
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) )
exectutor.map() is the portion that actually replaces the for loop. Take a look at the args:scrape_search_results: the function we want to call.[keyword] * len(localities): our keyword passed in as a list.[location] * len(localities): our location passed in as a list.localities: the list of localities we'd like to crawl.[data_pipeline] * len(localities): our DataPipeline object passed in as a list.[retries] * len(localities): our retry limit passed in as a list.executor.map() takes these lists and passes them into a bunch of separate instances of our parsing function.wait for our content to render.We need to tell ScrapeOps Proxy Aggregator the following four things when making our requests:"api_key": your ScrapeOps API key."url": the url we want to scrape."country": the country we want our request to be routed through. This parameter uses a location of our choice when we make the request."wait": how long to wait before sending our response. This allows the content to render on their end before we get it back.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
MAX_RETRIES: the max amount of retries for a parse.MAX_THREADS: how many threads you'd like to use when parsing pages simultaneously.LOCATION: the location you'd like to appear from.LOCALITIES: the areas of the map you'd like to scrape. They need to be added in as latitude and longitude pairs.keyword_list: the keywords you'd like to search the map for.def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[role='main']") info_cards = soup.find_all("div", class_="MyEned") for card in info_cards: review = card.text full_card = card.parent.parent.parent.parent reviewer_button = full_card.find("button") name = reviewer_button.get("aria-label").replace("Photo of ", "") rating_tag = full_card.select_one("span[role='img']") stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", "")) review_date = rating_tag.parent.find_all("span")[-1].text review_data = { "name": name, "stars": stars, "time_left": review_date, "review_shortened": review } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
info_card items: info_cards = soup.find_all("div", class_="MyEned").review = card.text.parent attribute to find the full review card that includes the reviewer name and rating: full_card = card.parent.parent.parent.parent.reviewer_button = full_card.find("button") finds the button that holds information about our reviewer.aria-label attribute: name = reviewer_button.get("aria-label").replace("Photo of ", ""). We also remove "Photo of " from the string that includes their name, this way, the only information we're saving is the reviewer name.int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", "")).review_date = rating_tag.parent.find_all("span")[-1].text finds all the span tags descended from the parent of our rating_tag. The last element is our review date, so we pull index -1 from the array.start_scrape(). This one needs to read our CSV file into an array of dict objects.Then, it should iterate through the array and call our parsing function on each row we read from the file.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries)
import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[role='main']") info_cards = soup.find_all("div", class_="MyEned") for card in info_cards: review = card.text full_card = card.parent.parent.parent.parent reviewer_button = full_card.find("button") name = reviewer_button.get("aria-label").replace("Photo of ", "") rating_tag = full_card.select_one("span[role='img']") stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", "")) review_date = rating_tag.parent.find_all("span")[-1].text review_data = { "name": name, "stars": stars, "time_left": review_date, "review_shortened": review } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline class. This makes our new storage really easy to implement. We just need to pass a dataclass into a DataPipeline. This new class will be used to represent reviews from the page.Take a look at ReviewData, it's almost identical to SearchData.@dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline from inside our parsing function. Then, as we extract our data, we convert it into ReviewData. That ReviewData then gets passed into the DataPipeline as we parse it.import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[role='main']") info_cards = soup.find_all("div", class_="MyEned") review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") for card in info_cards: review = card.text full_card = card.parent.parent.parent.parent reviewer_button = full_card.find("button") name = reviewer_button.get("aria-label").replace("Photo of ", "") rating_tag = full_card.select_one("span[role='img']") stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", "")) review_date = rating_tag.parent.find_all("span")[-1].text review_data = ReviewData( name=name, stars=stars, time_left=review_date, review_shortened=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
ThreadPoolExecutor just like we did before. We'll replace the for loop in process_results() with some more powerful, multithreaded code.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
process_business is the function we want to call on all threads.response = requests.get(get_scrapeops_url(url, location=location))import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[role='main']") info_cards = soup.find_all("div", class_="MyEned") review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") for card in info_cards: review = card.text full_card = card.parent.parent.parent.parent reviewer_button = full_card.find("button") name = reviewer_button.get("aria-label").replace("Photo of ", "") rating_tag = full_card.select_one("span[role='img']") stars = int(rating_tag.get("aria-label").replace(" stars", "").replace(" star", "")) review_date = rating_tag.parent.find_all("span")[-1].text review_data = ReviewData( name=name, stars=stars, time_left=review_date, review_shortened=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main again below.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt. Violating these can lead to suspension and even deletion of your account. You can view these documents from Google Maps below.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json file with your ScrapeOps API key.import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(get_scrapeops_url(url)) info_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='MyEned']") review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") for card in info_cards: review = card.find_element(By.CSS_SELECTOR, "span").get_attribute("innerHTML") full_card = card.find_element(By.XPATH, "../../../..") reviewer_button = full_card.find_element(By.CSS_SELECTOR, "button") name = reviewer_button.get_attribute("aria-label").replace("Photo of ", "") rating_tag = full_card.find_element(By.CSS_SELECTOR, "span[role='img']") stars = int(rating_tag.get_attribute("aria-label").replace(" stars", "").replace(" star", "")) rating_parent = rating_tag.find_element(By.XPATH, "..") review_date = rating_parent.find_elements(By.CSS_SELECTOR, "span")[-1].get_attribute("innerHTML") review_data = ReviewData( name=name, stars=stars, time_left=review_date, review_shortened=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES: Defines the maximum number of attempts to retry a failed request during scraping or processing.MAX_THREADS: Specifies the maximum number of threads for concurrent processing of tasks.LOCATION: Determines the geographical location for the scraping requests.LOCALITIES: Provides the geographic coordinates or specific areas to focus the Google Maps search.keyword_list: Contains the list of keywords to search for on Google Maps (e.g., types of businesses such as "restaurants" or "cafes").https://www.google.com/maps/search/restaurants/@42.3753166,-83.4750232,15z/data=!3m1!4b1?entry=ttu
search endpoint. Our target location gets added on to the keyword endpoint. When our scraper constructs its URLs, they'll be laid out like this:https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu
driver.get().As you can see, the popup contains a Reviews tab. This is where our individual reviews will come from.a tags descended from two div objects.div div a.div with the class of MyEned. Our review selector will look like this: div[class='MyEned'].us into our country param.You can view a list of country codes below.| Country | Country Code |
|---|---|
| Brazil | br |
| Canada | ca |
| China | cn |
| India | in |
| Italy | it |
| Japan | jp |
| France | fr |
| Germany | de |
| Russia | ru |
| Spain | es |
| United States | us |
| United Kingdom | uk |
cd into the folder.mkdir google-reviews-seleniumcd google-reviews-selenium
python -m venv venvsource venv/bin/activatepip install seleniumstart_scrape() is used to invoke our actual parsing function, scrape_search_results().We have some basic retry logic for the parser and some configuration variables inside of our main. The main holds the actual runtime of the programimport osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) response = driver.get(url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = { "name": name, "stars": rating, "url": maps_link, "rating_count": rating_count } print(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, retries=3): for locality in localities: scrape_search_results(keyword, location, locality, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, LOCATION, LOCALITIES, retries=MAX_RETRIES) aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
scrape_search_results(), you'll see the initial design we talked about in the understanding section.driver.find_elements(By.CSS_SELECTOR, "div div a") finds and returns all the a elements descended from two div elements on the page. This is one of the selectors we wrote earlier.get_attribute() to extract each restaurant name from the aria-label attribute.href.full_card containing all of our data, we need to find the parent element of the business link. We use its XPath to do this: business_link.find_element(By.XPATH, "..").span elements descended from another span with the role: img. Here is our selector: span[role='img'] > span. If there are ratings present, we extract both the rating and rating_count using the innerHTML from these items. With Selenium, the text method only reliably shows text that's displayed on the page. Instead of scrolling and waiting for this information to populate, we simply pull it from the HTML immediately.dict object. dict is great for protoyping, but it doesn't always cover edge cases due to weak typing. dict allows for things like missing fields that could potentially corrupt our data.To handle this, we'll create a SearchData class. Then, we need a pipeline to pass all of these objects into our CSV file. For this, we'll create a DataPipeline class.Take a look at SearchData. Nothing fancy, just the fields we extracted earlier coupled with a check_string_fields() method to ensure that we don't have any missing fields.@dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline. This class actually does most of the heavy lifting. It comes with a variety of methods, all of which are very important to our actual storage.To summarize it, our SearchData gets held inside the storage_queue. We can add new objects to the queue using the add_data() method. When close_pipeline() is invoked, the entire queue gets saved to a CSV file with save_to_csv().class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) response = driver.get(url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, retries=3): for locality in localities: scrape_search_results(keyword, location, locality, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
start_scrape() is used to trigger our parser on an array of localities using a for loop. This gets the job done, but it processes these localities one at a time. Let's rewrite it using ThreadPoolExecutor.Our new version opens a pool of threads and runs scrape_search_results on each thread simultaneously. It may look intimidating, but this function is much simpler than you might think.We pass lists of args into ThreadPoolExecutor and it then takes each arg from the list and passes it into an indiviudal instance of our target function.def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) )
scrape_search_results: The function we wish to run multiple instances of.[keyword] * len(localities), [location] * len(localities), [data_pipeline] * len(localities), [retries] * len(localities): All of these get passed in as arrays the length of our localities list.localities: The list we actually want to process.import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) response = driver.get(url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
api_key, url, and country).In this particular case, we need to add another parameter as well: wait. wait tells Proxy Aggregator to wait an arbitrary amount of time for content to render. We then use URL encoding to wrap all of these parameters into a ScrapeOps proxied url.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main below. Feel free to change the configuration variables to better fit your results.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) info_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='MyEned']") for card in info_cards: review = card.find_element(By.CSS_SELECTOR, "span").get_attribute("innerHTML") full_card = card.find_element(By.XPATH, "../../../..") reviewer_button = full_card.find_element(By.CSS_SELECTOR, "button") name = reviewer_button.get_attribute("aria-label").replace("Photo of ", "") rating_tag = full_card.find_element(By.CSS_SELECTOR, "span[role='img']") stars = int(rating_tag.get_attribute("aria-label").replace(" stars", "").replace(" star", "")) rating_parent = rating_tag.find_element(By.XPATH, "..") review_date = rating_parent.find_elements(By.CSS_SELECTOR, "span")[-1].get_attribute("innerHTML") review_data = { "name": name, "stars": stars, "time_left": review_date, "review_shortened": review } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
card.find_element(By.CSS_SELECTOR, "span").get_attribute("innerHTML") finds our actual review. Since its not visible on the page, we need to extract its innerHTML instead its text.card.find_element(By.XPATH, "../../../..") finds the full element holding our review. It's the parent of the parent of the parent of the parent of our card element... The "great great grandparent" if you will.button element and extract the reviewer's name from its aria-label.span elements containing the rating and rating_count.aria-label of its holder element.rating_count from the text of its holder as well. Since the element isn't visible on the page, we once again use its innerHTML as opposed to its actual text.process_results() will be used to trigger our scraping function.This function opens the CSV file and reads the rows into an array of dict objects. Each row then gets passed into process_business().def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries)
import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) info_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='MyEned']") for card in info_cards: review = card.find_element(By.CSS_SELECTOR, "span").get_attribute("innerHTML") full_card = card.find_element(By.XPATH, "../../../..") reviewer_button = full_card.find_element(By.CSS_SELECTOR, "button") name = reviewer_button.get_attribute("aria-label").replace("Photo of ", "") rating_tag = full_card.find_element(By.CSS_SELECTOR, "span[role='img']") stars = int(rating_tag.get_attribute("aria-label").replace(" stars", "").replace(" star", "")) rating_parent = rating_tag.find_element(By.XPATH, "..") review_date = rating_parent.find_elements(By.CSS_SELECTOR, "span")[-1].get_attribute("innerHTML") review_data = { "name": name, "stars": stars, "time_left": review_date, "review_shortened": review } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline class that takes in dataclass objects. Once again, we need to convert our extracted dict into a more strongly typed object.In the snippet below, we create a ReviewData object. It uses the same methods for dealing with bad data, we just have some different fields: name, stars, time_left, and review_shortened.time_left: The time the review was left. Google doesn't always give us specific dates, sometimes they just give us a general time frame such as 2 months ago.review_shortened: This is the shortened version of the review that gets displayed when we look at the business popup.@dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
process_results(). We then use our extracted data to create a ReviewData object. This object then gets passed into the pipeline. Once we've parsed the reviews, we close the pipeline.import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) info_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='MyEned']") review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") for card in info_cards: review = card.find_element(By.CSS_SELECTOR, "span").get_attribute("innerHTML") full_card = card.find_element(By.XPATH, "../../../..") reviewer_button = full_card.find_element(By.CSS_SELECTOR, "button") name = reviewer_button.get_attribute("aria-label").replace("Photo of ", "") rating_tag = full_card.find_element(By.CSS_SELECTOR, "span[role='img']") stars = int(rating_tag.get_attribute("aria-label").replace(" stars", "").replace(" star", "")) rating_parent = rating_tag.find_element(By.XPATH, "..") review_date = rating_parent.find_elements(By.CSS_SELECTOR, "span")[-1].get_attribute("innerHTML") review_data = ReviewData( name=name, stars=stars, time_left=review_date, review_shortened=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
ThreadPoolExecutor to run our parsing function on multiple threads.Take a look at our rewritten version below. Just like earlier, our first argument is the function we want to call: process_business. Next, we pass in our CSV file: reader. All other args once again get passed in as arrays the length of the list we want to process.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
driver.get() line of the parsing function. This single change finishes all of our coding for this project.driver.get(get_scrapeops_url(url))import osimport csvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 time_left: str = "" review_shortened: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) business_links = driver.find_elements(By.CSS_SELECTOR, "div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get_attribute("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get_attribute("href") full_card = business_link.find_element(By.XPATH, "..") rating_holders = full_card.find_elements(By.CSS_SELECTOR, "span[role='img'] > span") rating = 0.0 rating_count = 0 has_rating = rating_holders[0].get_attribute("innerHTML") if has_rating: rating = has_rating rating_count = rating_holders[1].get_attribute("innerHTML").replace("(", "").replace(")", "") search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) success = True logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(get_scrapeops_url(url)) info_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='MyEned']") review_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") for card in info_cards: review = card.find_element(By.CSS_SELECTOR, "span").get_attribute("innerHTML") full_card = card.find_element(By.XPATH, "../../../..") reviewer_button = full_card.find_element(By.CSS_SELECTOR, "button") name = reviewer_button.get_attribute("aria-label").replace("Photo of ", "") rating_tag = full_card.find_element(By.CSS_SELECTOR, "span[role='img']") stars = int(rating_tag.get_attribute("aria-label").replace(" stars", "").replace(" star", "")) rating_parent = rating_tag.find_element(By.XPATH, "..") review_date = rating_parent.find_elements(By.CSS_SELECTOR, "span")[-1].get_attribute("innerHTML") review_data = ReviewData( name=name, stars=stars, time_left=review_date, review_shortened=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main. As always, feel free to change any of the config variables.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
wait parameter, this is just about as fast as it can get.