Then check out ScrapeOps, the complete toolkit for web scraping.
config.json file with your api key and place it in the same folder as this script.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass RestaurantData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") info_section = json.loads(soup.select_one("script[type='application/ld+json']").text) list_elements = info_section["itemListElement"] unknown_count = 1 for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{unknown_count}" unknown_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] restaurant_data = RestaurantData( name=name, family_friendly=family_friendly, date=date, position=position ) review_pipeline.add_data(restaurant_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES: Defines the maximum number of times the script will attempt to retry an operation (such as scraping data) in case of failure.MAX_THREADS: Sets the maximum number of threads that can run concurrently. It controls how many threads (i.e., parallel tasks) can be used for scraping or processing data.PAGES: Defines how many pages of search results should be scraped for each keyword.LOCATION: Specifies the location or country for the search query, which is used in the search URL.keywords_list. Yelp uses different CSS and layouts for different types of businesses.name: the name of the business.sponsored: a boolean variable. If the post is an ad, sponsored is True.stars: how many stars the business has based on overall reviews.rank: where the business shows up in our search results.review_count: is the amount of reviews the business has.url: is the url to the Yelp page for the business.name: the name of the reviewer.family_friendly: whether or not they consider the business to be family friendly.date: the date that the review was uploaded.position: the position of the review on the page. For instance, the top review would have the position of 1.https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}
https://www.yelp.com/search?find_desc=restaurants&find_loc=us
/biz/. We don't have to worry too much about how these urls are constructed because we'll be extracting them straight from the search results.data-testid of serp-ia-card. You can take a look below. By identifying these cards, we can go through and extract our needed information from each card.On the individual business page, we're actually going to be pulling our data from a JSON blob embedded in the page.start param inside of it as well.If we want to start at 0 (this would be page 0) and get results up to 10, we would pass start=0. For page 2, we'd pass start=10.This process repeats all the way down the line. To fetch our batch, we'll call our page number times 10.find_loc param. We could separate these into two unique variables, but for the purpose of this tutorial, we're going to use the same place.For instance, if we're using a US based proxy server, we'll pass us in for our search location as well.mkdir yelp-scraper cd yelp-scraper
python -m venv venvsource venv/bin/activatepip install requestspip install beautifulsoup4import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = { "name": title, "sponsored": sponsored, "stars": rating, "rank": rank, "review_count": review_count, "url": yelp_url } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
"div[data-testid='serp-ia-card']"alt text from the image to pull the title of the businesssponsored or an actual search result. We use sponsored = card_text[0].isdigit() == False because all ranked results have a number before their name.sponsored, we use rank_string = card_text.replace(title, "").split(".") to split up the ranks. For instance, if the card title is 1. My Cool Business, we would split at . and our rank would be 1.a_element and extract its href to get the link to the Yelp page of the business.page_number argument and change one part of our parsing function, the URL:url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"start_scrape() function which allows us to scrape multiple pages.def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = { "name": title, "sponsored": sponsored, "stars": rating, "rank": rank, "review_count": review_count, "url": yelp_url } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
SearchData class. The purpose of this one is to simply hold our data while it's waiting to be stored.Then, we'll add a DataPipeline class. The DataPipeline is extremely important. This class takes all of our data and pipes it to a CSV file. It also filters out our duplicates.Here is the SearchData class.@dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
dict, we use it to create a SearchData object.search_data into the data_pipeline, which stores our data inside of a CSV file.ThreadPoolExecutor to scrape multiple pages simultaneously on separate threads.To do this, we only need to change one function, start_scrape().def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
for loop, we pass scrape_search_results into executor.map() along with all of our arguments. Notice that we pass our arguments in as arrays this time. Here is our fully updated crawler.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
wait parameter also tells the ScrapeOps server to wait for two seconds before sending the page back to us.proxy_url, but when you're debugging, it comes in really handy.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main. Feel free to change any of the constants here that you'd like. I changed MAX_THREADS to 4.Exercise caution when changing your keyword_list. Yelp uses slightly different layouts and CSS for different types of businesses.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
"residential": True to get_scrapeops_url().def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", "wait": 2000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url
def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") info_section = json.loads(soup.select_one("script[type='application/ld+json']").text) list_elements = info_section["itemListElement"] unknown_count = 1 for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{unknown_count}" unknown_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] restaurant_data = { "name": name, "family_friendly": family_friendly, "date": date, "position": position } print(restaurant_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
soup.select_one("script[type='application/ld+json']").textname, family_friendly, date, and position from the JSON.process_results().At the moment, it uses a for loop, but we'll add concurrency shortly.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") info_section = json.loads(soup.select_one("script[type='application/ld+json']").text) list_elements = info_section["itemListElement"] unknown_count = 1 for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{unknown_count}" unknown_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] restaurant_data = { "name": name, "family_friendly": family_friendly, "date": date, "position": position } print(restaurant_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
DataPipeline, we just need to create another dataclass, RestaurantData.Here is our new class:@dataclassclass RestaurantData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass RestaurantData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") info_section = json.loads(soup.select_one("script[type='application/ld+json']").text) list_elements = info_section["itemListElement"] unknown_count = 1 for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{unknown_count}" unknown_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] restaurant_data = RestaurantData( name=name, family_friendly=family_friendly, date=date, position=position ) review_pipeline.add_data(restaurant_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
executor.map(). As before, our first argument is our parsing function.All subsequent arguments are arrays that get passed into our parsing function.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass RestaurantData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") info_section = json.loads(soup.select_one("script[type='application/ld+json']").text) list_elements = info_section["itemListElement"] unknown_count = 1 for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{unknown_count}" unknown_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] restaurant_data = RestaurantData( name=name, family_friendly=family_friendly, date=date, position=position ) review_pipeline.add_data(restaurant_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt. You can view Yelp's terms here. Their robots.txt is available here.Remember, if you violate a site's policies, you can get suspended or even permanently banned from using their services.Public data is usually fair game for scraping. When information is not gated behind a login or some other type of authentication, this information is considered public. If you need to login to access the information, this is considered to be private data.If you have concerns about the legality of your scraper, consult an attorney.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json file with your ScrapeOps API keys and place it in the same folder as this script.import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.implicitly_wait(10) driver.get(get_scrapeops_url(url, location=location)) try: review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") info_section = json.loads(script.get_attribute("innerHTML")) anon_count = 1 list_elements = info_section["itemListElement"] for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{anon_count}" anon_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] review_data = ReviewData( name=name, family_friendly=family_friendly, date=date, position=position ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
config.json file with your ScrapeOps API key and place it in the same folder as this script. Feel free to tweak any of the following constants:MAX_RETRIES: Defines the maximum number of times the script will attempt to retry an operation (such as scraping data) in case of failure.MAX_THREADS: Sets the maximum number of threads that can run concurrently. It controls how many threads (i.e., parallel tasks) can be used for scraping or processing data.PAGES: Defines how many pages of search results should be scraped for each keyword.LOCATION: Specifies the location or country for the search query, which is used in the search URL.keywords_list, make sure to inspect the pages you're scraping. The layout and CSS might change.name: the name of the business.sponsored: a boolean variable. If the post is an ad, sponsored is True.stars: how many stars the business has based on overall reviews.rank: where the business shows up in our search results.review_count: is the amount of reviews the business has.url: is the url to the Yelp page for the business.name: the name of the reviewer.family_friendly: whether or not they consider the business to be family friendly.date: the date that the review was uploaded.position: the position of the review on the page. For instance, the top review would have the position of 1.https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}
restaurants in the us.
https://www.yelp.com/search?find_desc=restaurants&find_loc=usfind_desc is our search parameter and find_loc is our location.Take a look at the image below and you can see for yourself.Businesses pages on Yelp all have a url that comes after /biz/. We don't need to worry too much about these ones because we'll be extracting them straight from our search pages.Take a look at the image below.data-testid of serp-ia-card.Once we can find these cards, we can go through and extract their information.Take a look at the picture below so you can get a better understanding of this.When dealing with businesses on Yelp, much of our review data gets embedded in a JSON blob on the page. Take a look below.start param. We actually don't need to specify a page number in the URL.Each page gets 10 results, so we actually multiply our page number by 10.start=0) will give us results 1 through 10.start=10) will give us 11 through 20... and so on and so forth.us in as our country to the the ScrapeOps API and we'll also pass it into the find_loc param of our Yelp URL.mkdir yelp-scraper cd yelp-scraper
python -m venv venvsource venv/bin/activatepip install seleniumimport osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = { "name": title, "sponsored": sponsored, "stars": stars, "rank": ranking, "review_count": review_count, "url": yelp_url } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
OPTIONS.add_argument("--headless") sets our browser to run in headless mode. This saves valuable resources.options=OPTIONS in order to ensure that we're always running in headless mode.driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']")sponsored = card_text[0].isdigit() == False. Since all non sponsored items are ranked, all of them begin with a digit.img and use its alt to pull the name of the business, img.get_attribute("alt").. and pull the first element from list and convert it to an integer."div span[data-font-weight='semibold']". If there is a rating present, we ectract it.review is present. If it is, we once again use the .split() method to extract the review count.a_element and get its href to get the link to the page for each individual business.start parameter.Take a look at the code below.import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = { "name": title, "sponsored": sponsored, "stars": stars, "rank": ranking, "review_count": review_count, "url": yelp_url } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
start_scrape() function. At the moment, this doesn't do much other than give us the ability to scrape multiple pages.Later on, we'll add concurrency to this function.SearchData class to hold our data.Then this data will get passed into our DataPipeline. This DataPipeline pipes our data straight to a CSV file while removing duplicates.First, take a look at our SearchData.@dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
start_scrape() earlier? Now it's time to add that concurrency. We'll use ThreadPoolExecutor to scrape individual pages concurrently.Take a look at this function refactored to use multithreading.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
scrape_search_results. All other arguments are the arguments passed into this function. We pass these in as arrays so they can then get passed in to each function call.Our full code now looks like this.import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url
get_scrapeops_url() so it can return our proxied url with all these desired traits.url: the url we'd like to scrape.country: the country we want to be routed through by the API.residential: we want to use the residential proxy service. This greatly increases our chances of success when compared to a data center proxy.wait: we want ScrapeOps to wait 2 seconds for content to render before sending it back to us.import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main below. We'll be scraping 5 pages of search results.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
MAX_THREADSMAX_RETRIESLOCATIONPAGESscript item on the page and pulls JSON data from that item.Here is our process_business() function.def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.implicitly_wait(10) driver.get(url) try: script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") info_section = json.loads(script.get_attribute("innerHTML")) anon_count = 1 list_elements = info_section["itemListElement"] for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{anon_count}" anon_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] review_data = { "name": name, "family_friendly": family_friendly, "date": date, "position": position } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
script element with driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']").json.loads() on its innerHTML.dict returned by json.loads().process_business() on each of the rows from the file.Here is our process_results() function.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries)
import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.implicitly_wait(10) driver.get(url) try: script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") info_section = json.loads(script.get_attribute("innerHTML")) anon_count = 1 list_elements = info_section["itemListElement"] for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{anon_count}" anon_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] review_data = { "name": name, "family_friendly": family_friendly, "date": date, "position": position } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
ReviewData. It's almost identical to the SearchData class from earlier. It just holds slightly different information.Take a look at ReviewData.@dataclassclass ReviewData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline. This version of our script does exactly that.import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.implicitly_wait(10) driver.get(url) try: review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") info_section = json.loads(script.get_attribute("innerHTML")) anon_count = 1 list_elements = info_section["itemListElement"] for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{anon_count}" anon_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] review_data = ReviewData( name=name, family_friendly=family_friendly, date=date, position=position ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
ThreadPoolExecutor.Let's rewrite process_results() to do just that.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
get_scrapeops_url(). All that changes is one line of our parsing function.driver.get(get_scrapeops_url(url, location=location))import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.implicitly_wait(10) driver.get(get_scrapeops_url(url, location=location)) try: review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") info_section = json.loads(script.get_attribute("innerHTML")) anon_count = 1 list_elements = info_section["itemListElement"] for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{anon_count}" anon_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] review_data = ReviewData( name=name, family_friendly=family_friendly, date=date, position=position ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main. Once again, we'll start with a 5 page crawl.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt here.Scraping public information is generally considered legal. Public information on the web is any information that is not gated behind a login.If you need to login to view the data, this is considered private data. If you have questions about the legality of a scraping job, you should consult an attorney.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json with your ScrapeOps API keys to the folder.Yelp is very good at blocking scrapers but you don't need to worry about it because this one comes pre-built with support for the ScrapeOps Residential Proxy!const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(getScrapeOpsUrl(url, location), { timeout: 60000 }); const infoSectionElement = await page.$("script[type='application/ld+json']"); const infoText = await page.evaluate(element => element.textContent, infoSectionElement); const infoSection = JSON.parse(infoText); const listElements = infoSection.itemListElement; let anonCount = 1; for (const element of listElements) { let name = element.author.name; if (name === "Unknown User") { name = `${name}${anonCount}`; anonCount++; } familyFriendly = element.isFamilyFriendly; date = element.uploadDate, position = element.position const reviewData = { name: name, family_friendly: familyFriendly, date: date, position: position } await writeToCsv([reviewData], `${row.name.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); tries++; } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); while (businesses.length > 0) { const currentBatch = businesses.splice(0, concurrencyLimit); const tasks = currentBatch.map(business => processBusiness(browser, business, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); } async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 5; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log("Scrape complete");} main();
concurrencyLimit: Limits the number of simultaneous tasks (or browser instances/pages) that can run concurrently.pages: Determines how many pages of search results to scrape for each keyword.location: Specifies the geographic location for the search.retries: Sets the number of retry attempts if a scraping task fails due to an error (e.g., network issues or proxy blocks).keywords but you need to be cautious when doing so. Yelp uses different CSS and page layouts for different types of businesses.If you add online bank to the keywords, it will break the scraper. If you decide to change the keywords, make sure you inspect the page for your search and adjust the parsing function to fit the page layout.https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}
restaurants in the us, we would use the description parameter find_desc=restaurants and the location parameter find_loc=us.So our complete URL would be:https://www.yelp.com/search?find_desc=restaurants&find_loc=us
/biz/, and then the name of the business. The URLs here aren't too much of a concern because we're pulling them from the search pages.You can take a look at a Yelp business page below.script tag.Yelp's restaurant results all contain a data-testid of serp-ia-card. You can see it in the image below.Here is the JSON from the business page.start parameter. This parameter doesn't actually take our page number, it takes our result number.Yelp gives us 10 results per page.find_loc parameter.If we pass us into the ScrapeOps API, we will be routed through a server in the US. If we pass us into the find_loc parameter, Yelp will give us restaurants in the US.mkdir yelp-scraper cd yelp-scraper
npm init --ynpm install puppeteernpm install csv-writernpm install csv-parsenpm install fsconst puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function scrapeSearchResults(browser, keyword, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } console.log(searchData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, location, retries) { const browser = await puppeteer.launch() await scrapeSearchResults(browser, keyword, location, retries); await browser.close();} async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 1; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, location, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } } main();
scrapeSearchResults() particularly.await page.$$("div[data-testid='serp-ia-card']")const img = await divCard.$("img");await divCard.$("img").alt for the image: await page.evaluate(element => element.getAttribute("alt"), img).cardText.replace(name, ""); creates a variable of the title without the actual name of the business.let sponsored = isNaN(nameRemoved[0]). All actual results come with a ranking number. If there is no rank in the title, the card is a sponsored ad.. and convert it to a number, Number(rankString[0]). This gives us the ranking number of the result.div span[data-font-weight='semibold'] and if there's a rating present, we pull it from the card.aElement and extract its href in order to get the Yelp page for the business.pageNumber * 10 to the start param of our URL.Take a look at the code below, we added a couple more things, a range() function and we tweaked the startScrape() function to support scraping multiple pages.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } console.log(searchData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, location, retries); } await browser.close();} async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 1; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } } main();
https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}.append mode if the file exists.If the file doesn't exist, our function needs to create it. It also takes in an array of JSON objects, so whenever we pass something into this function, we need to pass it in as an array.Here is writeToCsv().async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, location, retries); } await browser.close();} async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 1; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } } main();
startScrape().Instead of a for loop, we're going to shrink our pageList() by splicing up to the concurrencyLimit and running scrapeSearchResults() on each page that got spliced.We then await all of these tasks and splice() again. We continue this process until the array shrinks down to nothing.Here is our refactored function.async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 1; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } } main();
const currentBatch = pageList.splice(0, concurrencyLimit); creates a list of tasks to run.await Promise.all(tasks);pageList is gone.function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}
api_key: holds our ScrapeOps API key.url: is the url of the website we'd like to scrape.country: is the country you'd like to be routed through.residential: allows us to use a residential IP address. This greatly increases our chances of getting through anti-bots because we're not showing up at a data center IP.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 1; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } } main();
main on 5 pages of results.async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 5; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } }
script tag and extract JSON from it.async function processBusiness(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url, { timeout: 60000 }); const infoSectionElement = await page.$("script[type='application/ld+json']"); const infoText = await page.evaluate(element => element.textContent, infoSectionElement); const infoSection = JSON.parse(infoText); const listElements = infoSection.itemListElement; let anonCount = 1; for (const element of listElements) { let name = element.author.name; if (name === "Unknown User") { name = `${name}${anonCount}`; anonCount++; } familyFriendly = element.isFamilyFriendly; date = element.uploadDate, position = element.position const reviewData = { name: name, family_friendly: familyFriendly, date: date, position: position } console.log(reviewData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); tries++; } finally { await page.close(); } } }
script tag that holds our JSON, await page.$("script[type='application/ld+json']");name: the name of the restaurant.familyFriendly: whether or not the restaurant is family friendly.date: the date the review was uploaded.position: the position that the review shows up on the page.readCsv() function.async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;}
processResults() function.async function processResults(csvFile, location, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const business of businesses) { await processBusiness(browser, business, location, retries) } await browser.close(); }
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url, { timeout: 60000 }); const infoSectionElement = await page.$("script[type='application/ld+json']"); const infoText = await page.evaluate(element => element.textContent, infoSectionElement); const infoSection = JSON.parse(infoText); const listElements = infoSection.itemListElement; let anonCount = 1; for (const element of listElements) { let name = element.author.name; if (name === "Unknown User") { name = `${name}${anonCount}`; anonCount++; } familyFriendly = element.isFamilyFriendly; date = element.uploadDate, position = element.position const reviewData = { name: name, family_friendly: familyFriendly, date: date, position: position } console.log(reviewData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); tries++; } finally { await page.close(); } } } async function processResults(csvFile, location, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const business of businesses) { await processBusiness(browser, business, location, retries) } await browser.close(); } async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 5; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { await processResults(file, location, retries); } console.log("Scrape complete");} main();
readCsv() reads our CSV file into an array of JSON objects.processResults() runs processBusiness() on every single one of the rows from the CSV file.await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`);const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url, { timeout: 60000 }); const infoSectionElement = await page.$("script[type='application/ld+json']"); const infoText = await page.evaluate(element => element.textContent, infoSectionElement); const infoSection = JSON.parse(infoText); const listElements = infoSection.itemListElement; let anonCount = 1; for (const element of listElements) { let name = element.author.name; if (name === "Unknown User") { name = `${name}${anonCount}`; anonCount++; } familyFriendly = element.isFamilyFriendly; date = element.uploadDate, position = element.position const reviewData = { name: name, family_friendly: familyFriendly, date: date, position: position } await writeToCsv([reviewData], `${row.name.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); tries++; } finally { await page.close(); } } } async function processResults(csvFile, location, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const business of businesses) { await processBusiness(browser, business, location, retries) } await browser.close(); } async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 5; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { await processResults(file, location, retries); } console.log("Scrape complete");} main();
processResults() function the same way we added concurrency earlier.async function processResults(csvFile, location, concurrencyLimit, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); while (businesses.length > 0) { const currentBatch = businesses.splice(0, concurrencyLimit); const tasks = currentBatch.map(business => processBusiness(browser, business, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); }
getScrapeOpsUrl(), we simply need to add it in one more spot.await page.goto(getScrapeOpsUrl(url, location), { timeout: 60000 });const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(getScrapeOpsUrl(url, location), { timeout: 60000 }); const infoSectionElement = await page.$("script[type='application/ld+json']"); const infoText = await page.evaluate(element => element.textContent, infoSectionElement); const infoSection = JSON.parse(infoText); const listElements = infoSection.itemListElement; let anonCount = 1; for (const element of listElements) { let name = element.author.name; if (name === "Unknown User") { name = `${name}${anonCount}`; anonCount++; } familyFriendly = element.isFamilyFriendly; date = element.uploadDate, position = element.position const reviewData = { name: name, family_friendly: familyFriendly, date: date, position: position } await writeToCsv([reviewData], `${row.name.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); tries++; } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); while (businesses.length > 0) { const currentBatch = businesses.splice(0, concurrencyLimit); const tasks = currentBatch.map(business => processBusiness(browser, business, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); } async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 5; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log("Scrape complete");} main();
main function. We're running on 5 pages again.async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 5; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log("Scrape complete");}
robots.txt here.It's typically legal to scrape data as long as it's publicly available. Public data is any data that's not gated behind a login. If you need to login to view the data, this is private data.If you have questions about the legality of a scraping job, you should consult an attorney.