Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your ScrapeOps Proxy API keys.python name_of_your_script.py
import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, "residential": True, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass BusinessData: name: str = "" street_address: str = "" city: str = "" state_and_zip: str = "" sunday: str = "" monday: str = "" tuesday: str = "" wednesday: str = "" thursday: str = "" friday: str = "" saturday: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") info_cards = soup.find_all("div") for card in info_cards: aria_label = card.get("aria-label") if not aria_label: continue if "Information" not in aria_label: continue print("card exists") button = card.find("button") address = button.text.replace("", "") address_array = address.split(",") street_address = address_array[0] city = address_array[1] state_and_zip = address_array[2] sunday = "" monday = "" tuesday = "" wednesday = "" thursday = "" friday = "" saturday = "" hours_cards = card.find_all("tr") for card in hours_cards: row_text = card.text if "Sunday" in row_text: sunday = row_text.replace("Sunday", "") elif "Monday" in row_text: monday = row_text.replace("Monday", "") elif "Tuesday" in row_text: tuesday = row_text.replace("Tuesday", "") elif "Wednesday" in row_text: wednesday = row_text.replace("Wednesday", "") elif "Thursday" in row_text: thursday = row_text.replace("Thursday", "") elif "Friday" in row_text: friday = row_text.replace("Friday", "") elif "Saturday" in row_text: saturday = row_text.replace("Saturday", "") else: continue business_data = BusinessData( name=row["name"], street_address=street_address, city=city, state_and_zip=state_and_zip, sunday=sunday, monday=monday, tuesday=tuesday, wednesday=wednesday, thursday=thursday, friday=friday, saturday=saturday ) business_pipeline.add_data(business_data) break business_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: the max amount of retries for a parse.MAX_THREADS
: how many threads you'd like to use when parsing pages simultaneously.LOCATION
: the location you'd like to appear from.LOCALITIES
: the areas of the map you'd like to scrape. They need to be added in as latitude and longitude pairs.keyword_list
: the keywords you'd like to search the map for, in this instance restaurant
.https://www.google.com/maps/search/Restaurants/@42.3753166,-83.4750232,15z/data=!3m1!4b1?entry=ttu
42.3753166,-83.4750232
represents the area we're searching.42.3753166
is our latitude and -83.4750232
is our longitude./Restaurants
tells Google that we'd like to search for restaurants.wait
parameter with the ScrapeOps Proxy API.
If you look at the image below, you can see a highlighted a
element. Within that element, the business name is embedded as the aria-label
.
div
card.aria-label
that begins with the word "Information" followed by the name of the business.divs
and then filter through them until we find one with an aria-label
containing the word "Information"
.country
param into the ScrapeOps API. This will route us through servers in a country of our choosing.locality
parameter when building our url. This locality
will be made from the latitude and longitude points you saw earlier.mkdir google-maps-scraper cd google-maps-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
a
elements descended from at least two div
elements. Then, we use some logic to filter out the links we don't want.
After we've found a target link, we find its parent element and pull the rest of our data from the parent element.
Here is the code we'll start with.
import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = { "name": name, "stars": rating, "url": maps_link, "rating_count": rating_count } logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, retries=3): for locality in localities: scrape_search_results(keyword, location, locality, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, LOCATION, LOCALITIES, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu
.
formatted_keyword
is the search we're performing.locality
holds the latitude and longitude coordinates for the area we'd like to scrape.soup.select("div div a")
finds all of our link elements.if
and continue
statement to filter out unwanted links.business_link.get("aria-label")
pulls the name of the business.business_link.get("href")
finds the link to the site.rating
and rating_count
to zero.rating
and rating_count
with some string splitting and save them to their respective variables.dataclass
, SearchData
. This class will simply hold data that needs to be stored.
@dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
dataclass
and store it. This is where our DataPipeline
comes into play. This class opens a pipe to a CSV file and filters out duplicates.
class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, retries=3): for locality in localities: scrape_search_results(keyword, location, locality, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
DataPipeline
, crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
.start_scrape()
which in turn passes it into scrape_search_results()
.SearchData
object and pass it into the pipeline.crawl_pipeline.close_pipeline()
.start_scrape()
already gives us the ability to scrape a list of localities
. Instead of scraping them one at a time with a for
loop, we should be scraping as many as possible at one time. This is where concurrency comes into play. ü
We're going to refactor start_scrape()
to use multithreading instead of a for
loop.
def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) )
executor.map()
:
scrape_search_results
is what we want to do on all the available threads.scrape_search_results
.localities
is the list of areas we'd like to search.localities
.import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
wait
and residential
.
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, "residential": True, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
"api_key"
: holds your ScrapeOps API key."url"
: is the url we want to scrape."country"
: is the location we'd like to be routed through."wait": 5000
: tells the ScrapeOps server that we want it to wait 5 seconds for content to render before sending our response back."residential": True
tells ScrapeOps that we want to use a residential IP address. This greatly decreases our likelihood of getting blocked.import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, "residential": True, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
localities
. If you want to tweak your results, go ahead and change any of the constants from the main
below.
if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
div
objects. The we iterate through them until we've found the div
that holds the business info.
Once we've found that card, we pull the address and business hours from it.
def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") info_cards = soup.find_all("div") for card in info_cards: aria_label = card.get("aria-label") if not aria_label: continue if "Information" not in aria_label: continue print("card exists") button = card.find("button") address = button.text.replace("", "") address_array = address.split(",") street_address = address_array[0] city = address_array[1] state_and_zip = address_array[2] sunday = "" monday = "" tuesday = "" wednesday = "" thursday = "" friday = "" saturday = "" hours_cards = card.find_all("tr") for card in hours_cards: row_text = card.text if "Sunday" in row_text: sunday = row_text.replace("Sunday", "") elif "Monday" in row_text: monday = row_text.replace("Monday", "") elif "Tuesday" in row_text: tuesday = row_text.replace("Tuesday", "") elif "Wednesday" in row_text: wednesday = row_text.replace("Wednesday", "") elif "Thursday" in row_text: thursday = row_text.replace("Thursday", "") elif "Friday" in row_text: friday = row_text.replace("Friday", "") elif "Saturday" in row_text: saturday = row_text.replace("Saturday", "") else: continue business_data = { "name": row["name"], "street_address": street_address, "city": city, "state_and_zip": state_and_zip, "sunday": sunday, "monday": monday, "tuesday": tuesday, "wednesday": wednesday, "thursday": thursday, "friday": friday, "saturday": saturday } print(business_data) break success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
if not aria_label
is used to filter out any div
cards that don't hold the aria-label
attribute.if "Information" not in aria_label
is used to skip any div
cards that have the wrong aria-label
.button
element and then use some string splitting to separate different pieces of the address: street_address
, city
, state_and_zip
.tr
elements and assign the daily hours to their corresponding variables.process_results()
does exactly that.
It reads the CSV file, and then it runs process_business()
on each and every one of those results.
def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries)
import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, "residential": True, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") info_cards = soup.find_all("div") for card in info_cards: aria_label = card.get("aria-label") if not aria_label: continue if "Information" not in aria_label: continue print("card exists") button = card.find("button") address = button.text.replace("", "") address_array = address.split(",") street_address = address_array[0] city = address_array[1] state_and_zip = address_array[2] sunday = "" monday = "" tuesday = "" wednesday = "" thursday = "" friday = "" saturday = "" hours_cards = card.find_all("tr") for card in hours_cards: row_text = card.text if "Sunday" in row_text: sunday = row_text.replace("Sunday", "") elif "Monday" in row_text: monday = row_text.replace("Monday", "") elif "Tuesday" in row_text: tuesday = row_text.replace("Tuesday", "") elif "Wednesday" in row_text: wednesday = row_text.replace("Wednesday", "") elif "Thursday" in row_text: thursday = row_text.replace("Thursday", "") elif "Friday" in row_text: friday = row_text.replace("Friday", "") elif "Saturday" in row_text: saturday = row_text.replace("Saturday", "") else: continue business_data = { "name": row["name"], "street_address": street_address, "city": city, "state_and_zip": state_and_zip, "sunday": sunday, "monday": monday, "tuesday": tuesday, "wednesday": wednesday, "thursday": thursday, "friday": friday, "saturday": saturday } print(business_data) break success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
dataclass
. We'll call this one BusinessData
. This class will hold the business's address data and hours. After we instantiate a BusinessData
object, we'll need to pass it into a DataPipeline
like we did before.
Here is our BusinessData
class.
@dataclassclass BusinessData: name: str = "" street_address: str = "" city: str = "" state_and_zip: str = "" sunday: str = "" monday: str = "" tuesday: str = "" wednesday: str = "" thursday: str = "" friday: str = "" saturday: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
from inside our parsing function. We then create a BusinessData
object to pass into it and close the pipeline once we've stored the data.
import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, "residential": True, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass BusinessData: name: str = "" street_address: str = "" city: str = "" state_and_zip: str = "" sunday: str = "" monday: str = "" tuesday: str = "" wednesday: str = "" thursday: str = "" friday: str = "" saturday: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") info_cards = soup.find_all("div") for card in info_cards: aria_label = card.get("aria-label") if not aria_label: continue if "Information" not in aria_label: continue print("card exists") button = card.find("button") address = button.text.replace("", "") address_array = address.split(",") street_address = address_array[0] city = address_array[1] state_and_zip = address_array[2] sunday = "" monday = "" tuesday = "" wednesday = "" thursday = "" friday = "" saturday = "" hours_cards = card.find_all("tr") for card in hours_cards: row_text = card.text if "Sunday" in row_text: sunday = row_text.replace("Sunday", "") elif "Monday" in row_text: monday = row_text.replace("Monday", "") elif "Tuesday" in row_text: tuesday = row_text.replace("Tuesday", "") elif "Wednesday" in row_text: wednesday = row_text.replace("Wednesday", "") elif "Thursday" in row_text: thursday = row_text.replace("Thursday", "") elif "Friday" in row_text: friday = row_text.replace("Friday", "") elif "Saturday" in row_text: saturday = row_text.replace("Saturday", "") else: continue business_data = BusinessData( name=row["name"], street_address=street_address, city=city, state_and_zip=state_and_zip, sunday=sunday, monday=monday, tuesday=tuesday, wednesday=wednesday, thursday=thursday, friday=friday, saturday=saturday ) business_pipeline.add_data(business_data) break business_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
ThreadPoolExecutor
to replace a for
loop. Just like before, our first arg is the function we'd like to call. Each argument after is a list that gets passed into process_business
.
def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
process_business
is the function we want to call on open threads.process_business
get passed in as lists.response = requests.get(get_scrapeops_url(url, location=location))
import osimport reimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, "residential": True, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" rating_count: int = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass BusinessData: name: str = "" street_address: str = "" city: str = "" state_and_zip: str = "" sunday: str = "" monday: str = "" tuesday: str = "" wednesday: str = "" thursday: str = "" friday: str = "" saturday: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/maps/search/{formatted_keyword}/@{locality},14z/data=!3m1!4b1?entry=ttu" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") business_links = soup.select("div div a") excluded_words = ["Sign in"] for business_link in business_links: name = business_link.get("aria-label") if not name or name in excluded_words or "Visit" in name: continue maps_link = business_link.get("href") full_card = business_link.parent rating_holder = full_card.select_one("span[role='img']") rating = 0.0 rating_count = 0 if rating_holder: rating_array = rating_holder.text.split("(") rating = rating_array[0] rating_count = int(rating_array[1].replace(")", "").replace(",", "")) search_data = SearchData( name=name, stars=rating, url=maps_link, rating_count=rating_count ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, location, localities, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * len(localities), [location] * len(localities), localities, [data_pipeline] * len(localities), [retries] * len(localities) ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") business_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") info_cards = soup.find_all("div") for card in info_cards: aria_label = card.get("aria-label") if not aria_label: continue if "Information" not in aria_label: continue print("card exists") button = card.find("button") address = button.text.replace("", "") address_array = address.split(",") street_address = address_array[0] city = address_array[1] state_and_zip = address_array[2] sunday = "" monday = "" tuesday = "" wednesday = "" thursday = "" friday = "" saturday = "" hours_cards = card.find_all("tr") for card in hours_cards: row_text = card.text if "Sunday" in row_text: sunday = row_text.replace("Sunday", "") elif "Monday" in row_text: monday = row_text.replace("Monday", "") elif "Tuesday" in row_text: tuesday = row_text.replace("Tuesday", "") elif "Wednesday" in row_text: wednesday = row_text.replace("Wednesday", "") elif "Thursday" in row_text: thursday = row_text.replace("Thursday", "") elif "Friday" in row_text: friday = row_text.replace("Friday", "") elif "Saturday" in row_text: saturday = row_text.replace("Saturday", "") else: continue business_data = BusinessData( name=row["name"], street_address=street_address, city=city, state_and_zip=state_and_zip, sunday=sunday, monday=monday, tuesday=tuesday, wednesday=wednesday, thursday=thursday, friday=friday, saturday=saturday ) business_pipeline.add_data(business_data) break business_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
.
if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" LOCALITIES = ["42.3,-83.5","42.35,-83.5", "42.4,-83.5"] logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurant"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, LOCATION, LOCALITIES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
. Violating these terms could get your Google account suspended or even permanently deleted. Be careful about breaking rules online.
Always remember that when you're scraping public data, it is generally considered public knowledge so you're usually in the clear legally.
Private data is another matter. When you scrape private data, you are subject to that company's terms and the privacy laws that apply to that company.
If you don't know whether or not your scraper is legal, you need to consult an attorney.