Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your api key and place it in the same folder as this script.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass RestaurantData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") info_section = json.loads(soup.select_one("script[type='application/ld+json']").text) list_elements = info_section["itemListElement"] unknown_count = 1 for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{unknown_count}" unknown_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] restaurant_data = RestaurantData( name=name, family_friendly=family_friendly, date=date, position=position ) review_pipeline.add_data(restaurant_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: Defines the maximum number of times the script will attempt to retry an operation (such as scraping data) in case of failure.MAX_THREADS
: Sets the maximum number of threads that can run concurrently. It controls how many threads (i.e., parallel tasks) can be used for scraping or processing data.PAGES
: Defines how many pages of search results should be scraped for each keyword.LOCATION
: Specifies the location or country for the search query, which is used in the search URL.keywords_list
. Yelp uses different CSS and layouts for different types of businesses.name
: the name of the business.sponsored
: a boolean variable. If the post is an ad, sponsored
is True
.stars
: how many stars the business has based on overall reviews.rank
: where the business shows up in our search results.review_count
: is the amount of reviews the business has.url
: is the url to the Yelp page for the business.name
: the name of the reviewer.family_friendly
: whether or not they consider the business to be family friendly.date
: the date that the review was uploaded.position
: the position of the review on the page. For instance, the top review would have the position
of 1.https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}
https://www.yelp.com/search?find_desc=restaurants&find_loc=us
/biz/
. We don't have to worry too much about how these urls are constructed because we'll be extracting them straight from the search results.data-testid
of serp-ia-card
. You can take a look below. By identifying these cards, we can go through and extract our needed information from each card.start
param inside of it as well.If we want to start
at 0 (this would be page 0) and get results up to 10, we would pass start=0
. For page 2, we'd pass start=10
.This process repeats all the way down the line. To fetch our batch, we'll call our page number times 10.find_loc
param. We could separate these into two unique variables, but for the purpose of this tutorial, we're going to use the same place.For instance, if we're using a US based proxy server, we'll pass us
in for our search location as well.mkdir yelp-scraper cd yelp-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = { "name": title, "sponsored": sponsored, "stars": rating, "rank": rank, "review_count": review_count, "url": yelp_url } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
"div[data-testid='serp-ia-card']"
alt text
from the image to pull the title
of the businesssponsored
or an actual search result. We use sponsored = card_text[0].isdigit() == False
because all ranked results have a number before their name.sponsored
, we use rank_string = card_text.replace(title, "").split(".")
to split up the ranks. For instance, if the card title is 1. My Cool Business
, we would split at .
and our rank would be 1.a_element
and extract its href
to get the link to the Yelp page of the business.page_number
argument and change one part of our parsing function, the URL:url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}"
start_scrape()
function which allows us to scrape multiple pages.def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = { "name": title, "sponsored": sponsored, "stars": rating, "rank": rank, "review_count": review_count, "url": yelp_url } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
SearchData
class. The purpose of this one is to simply hold our data while it's waiting to be stored.Then, we'll add a DataPipeline
class. The DataPipeline
is extremely important. This class takes all of our data and pipes it to a CSV file. It also filters out our duplicates.Here is the SearchData
class.@dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
dict
, we use it to create a SearchData
object.search_data
into the data_pipeline
, which stores our data inside of a CSV file.ThreadPoolExecutor
to scrape multiple pages simultaneously on separate threads.To do this, we only need to change one function, start_scrape()
.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
for
loop, we pass scrape_search_results
into executor.map()
along with all of our arguments. Notice that we pass our arguments in as arrays this time. Here is our fully updated crawler.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
wait
parameter also tells the ScrapeOps server to wait for two seconds before sending the page back to us.proxy_url
, but when you're debugging, it comes in really handy.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
. Feel free to change any of the constants here that you'd like. I changed MAX_THREADS
to 4.Exercise caution when changing your keyword_list
. Yelp uses slightly different layouts and CSS for different types of businesses.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
"residential": True
to get_scrapeops_url()
.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", "wait": 2000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url
def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") info_section = json.loads(soup.select_one("script[type='application/ld+json']").text) list_elements = info_section["itemListElement"] unknown_count = 1 for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{unknown_count}" unknown_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] restaurant_data = { "name": name, "family_friendly": family_friendly, "date": date, "position": position } print(restaurant_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
soup.select_one("script[type='application/ld+json']").text
name
, family_friendly
, date
, and position
from the JSON.process_results()
.At the moment, it uses a for
loop, but we'll add concurrency shortly.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") info_section = json.loads(soup.select_one("script[type='application/ld+json']").text) list_elements = info_section["itemListElement"] unknown_count = 1 for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{unknown_count}" unknown_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] restaurant_data = { "name": name, "family_friendly": family_friendly, "date": date, "position": position } print(restaurant_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
DataPipeline
, we just need to create another dataclass
, RestaurantData
.Here is our new class:@dataclassclass RestaurantData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass RestaurantData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") info_section = json.loads(soup.select_one("script[type='application/ld+json']").text) list_elements = info_section["itemListElement"] unknown_count = 1 for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{unknown_count}" unknown_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] restaurant_data = RestaurantData( name=name, family_friendly=family_friendly, date=date, position=position ) review_pipeline.add_data(restaurant_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
executor.map()
. As before, our first argument is our parsing function.All subsequent arguments are arrays that get passed into our parsing function.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass RestaurantData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") #possibly need to urlencode location: city + state + zip code url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find("img") title = img.get("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) has_rating = div_card.select_one("div span[data-font-weight='semibold']") rating = 0.0 if len(has_rating.text) > 0: if has_rating.text[0].isdigit(): rating = float(has_rating.text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ")[0] a_element = div_card.find("a") link = a_element.get("href") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") info_section = json.loads(soup.select_one("script[type='application/ld+json']").text) list_elements = info_section["itemListElement"] unknown_count = 1 for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{unknown_count}" unknown_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] restaurant_data = RestaurantData( name=name, family_friendly=family_friendly, date=date, position=position ) review_pipeline.add_data(restaurant_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
. You can view Yelp's terms here. Their robots.txt
is available here.Remember, if you violate a site's policies, you can get suspended or even permanently banned from using their services.Public data is usually fair game for scraping. When information is not gated behind a login or some other type of authentication, this information is considered public. If you need to login to access the information, this is considered to be private data.If you have concerns about the legality of your scraper, consult an attorney.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your ScrapeOps API keys and place it in the same folder as this script.import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.implicitly_wait(10) driver.get(get_scrapeops_url(url, location=location)) try: review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") info_section = json.loads(script.get_attribute("innerHTML")) anon_count = 1 list_elements = info_section["itemListElement"] for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{anon_count}" anon_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] review_data = ReviewData( name=name, family_friendly=family_friendly, date=date, position=position ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
config.json
file with your ScrapeOps API key and place it in the same folder as this script. Feel free to tweak any of the following constants:MAX_RETRIES
: Defines the maximum number of times the script will attempt to retry an operation (such as scraping data) in case of failure.MAX_THREADS
: Sets the maximum number of threads that can run concurrently. It controls how many threads (i.e., parallel tasks) can be used for scraping or processing data.PAGES
: Defines how many pages of search results should be scraped for each keyword.LOCATION
: Specifies the location or country for the search query, which is used in the search URL.keywords_list
, make sure to inspect the pages you're scraping. The layout and CSS might change.name
: the name of the business.sponsored
: a boolean variable. If the post is an ad, sponsored
is True
.stars
: how many stars the business has based on overall reviews.rank
: where the business shows up in our search results.review_count
: is the amount of reviews the business has.url
: is the url to the Yelp page for the business.name
: the name of the reviewer.family_friendly
: whether or not they consider the business to be family friendly.date
: the date that the review was uploaded.position
: the position of the review on the page. For instance, the top review would have the position
of 1.https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}
restaurants
in the us
.
https://www.yelp.com/search?find_desc=restaurants&find_loc=us
find_desc
is our search parameter and find_loc
is our location.Take a look at the image below and you can see for yourself./biz/
. We don't need to worry too much about these ones because we'll be extracting them straight from our search pages.Take a look at the image below.data-testid
of serp-ia-card
.Once we can find these cards, we can go through and extract their information.Take a look at the picture below so you can get a better understanding of this.start
param. We actually don't need to specify a page number in the URL.Each page gets 10 results, so we actually multiply our page number by 10.start=0
) will give us results 1 through 10.start=10
) will give us 11 through 20... and so on and so forth.us
in as our country
to the the ScrapeOps API and we'll also pass it into the find_loc
param of our Yelp URL.mkdir yelp-scraper cd yelp-scraper
python -m venv venv
source venv/bin/activate
pip install selenium
import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = { "name": title, "sponsored": sponsored, "stars": stars, "rank": ranking, "review_count": review_count, "url": yelp_url } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
OPTIONS.add_argument("--headless")
sets our browser to run in headless mode. This saves valuable resources.options=OPTIONS
in order to ensure that we're always running in headless mode.driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']")
sponsored = card_text[0].isdigit() == False
. Since all non sponsored items are ranked, all of them begin with a digit.img
and use its alt
to pull the name of the business, img.get_attribute("alt")
..
and pull the first element from list and convert it to an integer."div span[data-font-weight='semibold']"
. If there is a rating present, we ectract it.review
is present. If it is, we once again use the .split()
method to extract the review count.a_element
and get its href
to get the link to the page for each individual business.start
parameter.Take a look at the code below.import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = { "name": title, "sponsored": sponsored, "stars": stars, "rank": ranking, "review_count": review_count, "url": yelp_url } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
start_scrape()
function. At the moment, this doesn't do much other than give us the ability to scrape multiple pages.Later on, we'll add concurrency to this function.SearchData
class to hold our data.Then this data will get passed into our DataPipeline
. This DataPipeline
pipes our data straight to a CSV file while removing duplicates.First, take a look at our SearchData
.@dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
start_scrape()
earlier? Now it's time to add that concurrency. We'll use ThreadPoolExecutor
to scrape individual pages concurrently.Take a look at this function refactored to use multithreading.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
scrape_search_results
. All other arguments are the arguments passed into this function. We pass these in as arrays so they can then get passed in to each function call.Our full code now looks like this.import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url
get_scrapeops_url()
so it can return our proxied url with all these desired traits.url
: the url we'd like to scrape.country
: the country we want to be routed through by the API.residential
: we want to use the residential proxy service. This greatly increases our chances of success when compared to a data center proxy.wait
: we want ScrapeOps to wait 2 seconds for content to render before sending it back to us.import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
below. We'll be scraping 5 pages of search results.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
MAX_THREADS
MAX_RETRIES
LOCATION
PAGES
script
item on the page and pulls JSON data from that item.Here is our process_business()
function.def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.implicitly_wait(10) driver.get(url) try: script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") info_section = json.loads(script.get_attribute("innerHTML")) anon_count = 1 list_elements = info_section["itemListElement"] for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{anon_count}" anon_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] review_data = { "name": name, "family_friendly": family_friendly, "date": date, "position": position } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
script
element with driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
.json.loads()
on its innerHTML
.dict
returned by json.loads()
.process_business()
on each of the rows from the file.Here is our process_results()
function.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries)
import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.implicitly_wait(10) driver.get(url) try: script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") info_section = json.loads(script.get_attribute("innerHTML")) anon_count = 1 list_elements = info_section["itemListElement"] for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{anon_count}" anon_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] review_data = { "name": name, "family_friendly": family_friendly, "date": date, "position": position } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
ReviewData
. It's almost identical to the SearchData
class from earlier. It just holds slightly different information.Take a look at ReviewData
.@dataclassclass ReviewData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. This version of our script does exactly that.import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.implicitly_wait(10) driver.get(url) try: review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") info_section = json.loads(script.get_attribute("innerHTML")) anon_count = 1 list_elements = info_section["itemListElement"] for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{anon_count}" anon_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] review_data = ReviewData( name=name, family_friendly=family_friendly, date=date, position=position ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
ThreadPoolExecutor
.Let's rewrite process_results()
to do just that.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
get_scrapeops_url()
. All that changes is one line of our parsing function.driver.get(get_scrapeops_url(url, location=location))
import osimport csvimport jsonimport loggingfrom time import sleepfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" sponsored: bool = False stars: float = 0 rank: int = 0 review_count: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" family_friendly: bool = False date: str = "" position: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='serp-ia-card']") for div_card in div_cards: card_text = div_card.text sponsored = card_text[0].isdigit() == False ranking = None img = div_card.find_element(By.CSS_SELECTOR, "img") title = img.get_attribute("alt") if not sponsored: rank_string = card_text.replace(title, "").split(".") if len(rank_string) > 0: ranking = int(rank_string[0]) rating = 0.0 has_rating = driver.find_elements(By.CSS_SELECTOR, "div span[data-font-weight='semibold']") if len(has_rating[0].text) > 0: if has_rating.text[0].isdigit(): has_rating = float(rating[0].text) review_count = 0 if "review" in card_text: review_count = card_text.split("(")[1].split(")")[0].split(" ") a_element = div_card.find_element(By.CSS_SELECTOR, "a") link = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") yelp_url = f"https://www.yelp.com{link}" search_data = SearchData( name=title, sponsored=sponsored, stars=rating, rank=ranking, review_count=review_count, url=yelp_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.implicitly_wait(10) driver.get(get_scrapeops_url(url, location=location)) try: review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") script = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") info_section = json.loads(script.get_attribute("innerHTML")) anon_count = 1 list_elements = info_section["itemListElement"] for element in list_elements: name = element["author"]["name"] if name == "Unknown User": name = f"{name}{anon_count}" anon_count += 1 family_friendly = element["isFamilyFriendly"] date = element.get("uploadDate") position = element["position"] review_data = ReviewData( name=name, family_friendly=family_friendly, date=date, position=position ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
. Once again, we'll start with a 5 page crawl.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 4 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["restaurants"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
here.Scraping public information is generally considered legal. Public information on the web is any information that is not gated behind a login.If you need to login to view the data, this is considered private data. If you have questions about the legality of a scraping job, you should consult an attorney.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
with your ScrapeOps API keys to the folder.Yelp is very good at blocking scrapers but you don't need to worry about it because this one comes pre-built with support for the ScrapeOps Residential Proxy!const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(getScrapeOpsUrl(url, location), { timeout: 60000 }); const infoSectionElement = await page.$("script[type='application/ld+json']"); const infoText = await page.evaluate(element => element.textContent, infoSectionElement); const infoSection = JSON.parse(infoText); const listElements = infoSection.itemListElement; let anonCount = 1; for (const element of listElements) { let name = element.author.name; if (name === "Unknown User") { name = `${name}${anonCount}`; anonCount++; } familyFriendly = element.isFamilyFriendly; date = element.uploadDate, position = element.position const reviewData = { name: name, family_friendly: familyFriendly, date: date, position: position } await writeToCsv([reviewData], `${row.name.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); tries++; } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); while (businesses.length > 0) { const currentBatch = businesses.splice(0, concurrencyLimit); const tasks = currentBatch.map(business => processBusiness(browser, business, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); } async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 5; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log("Scrape complete");} main();
concurrencyLimit
: Limits the number of simultaneous tasks (or browser instances/pages) that can run concurrently.pages
: Determines how many pages of search results to scrape for each keyword.location
: Specifies the geographic location for the search.retries
: Sets the number of retry attempts if a scraping task fails due to an error (e.g., network issues or proxy blocks).keywords
but you need to be cautious when doing so. Yelp uses different CSS and page layouts for different types of businesses.If you add online bank
to the keywords, it will break the scraper. If you decide to change the keywords
, make sure you inspect the page for your search and adjust the parsing function to fit the page layout.https://www.yelp.com/search?find_desc={formatted_keyword}&find_loc={location}
restaurants
in the us
, we would use the description parameter find_desc=restaurants
and the location parameter find_loc=us
.So our complete URL would be:https://www.yelp.com/search?find_desc=restaurants&find_loc=us
/biz/
, and then the name of the business. The URLs here aren't too much of a concern because we're pulling them from the search pages.You can take a look at a Yelp business page below.script
tag.Yelp's restaurant results all contain a data-testid
of serp-ia-card
. You can see it in the image below.start
parameter. This parameter doesn't actually take our page number, it takes our result number.Yelp gives us 10 results per page.find_loc
parameter.If we pass us
into the ScrapeOps API, we will be routed through a server in the US. If we pass us
into the find_loc
parameter, Yelp will give us restaurants in the US.mkdir yelp-scraper cd yelp-scraper
npm init --y
npm install puppeteer
npm install csv-writer
npm install csv-parse
npm install fs
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function scrapeSearchResults(browser, keyword, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } console.log(searchData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, location, retries) { const browser = await puppeteer.launch() await scrapeSearchResults(browser, keyword, location, retries); await browser.close();} async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 1; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, location, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } } main();
scrapeSearchResults()
particularly.await page.$$("div[data-testid='serp-ia-card']")
const img = await divCard.$("img");
await divCard.$("img")
.alt
for the image: await page.evaluate(element => element.getAttribute("alt"), img)
.cardText.replace(name, "");
creates a variable of the title without the actual name of the business.let sponsored = isNaN(nameRemoved[0])
. All actual results come with a ranking number. If there is no rank in the title, the card is a sponsored ad..
and convert it to a number, Number(rankString[0])
. This gives us the ranking number of the result.div span[data-font-weight='semibold']
and if there's a rating present, we pull it from the card.aElement
and extract its href
in order to get the Yelp page for the business.pageNumber * 10
to the start
param of our URL.Take a look at the code below, we added a couple more things, a range()
function and we tweaked the startScrape()
function to support scraping multiple pages.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } console.log(searchData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, location, retries); } await browser.close();} async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 1; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } } main();
https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}
.append
mode if the file exists.If the file doesn't exist, our function needs to create it. It also takes in an array of JSON objects, so whenever we pass something into this function, we need to pass it in as an array.Here is writeToCsv()
.async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, location, retries); } await browser.close();} async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 1; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } } main();
startScrape()
.Instead of a for
loop, we're going to shrink our pageList()
by splicing up to the concurrencyLimit
and running scrapeSearchResults()
on each page that got spliced.We then await
all of these tasks and splice()
again. We continue this process until the array shrinks down to nothing.Here is our refactored function.async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 1; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } } main();
const currentBatch = pageList.splice(0, concurrencyLimit);
creates a list of tasks to run.await Promise.all(tasks);
pageList
is gone.function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}
api_key
: holds our ScrapeOps API key.url
: is the url of the website we'd like to scrape.country
: is the country you'd like to be routed through.residential
: allows us to use a residential IP address. This greatly increases our chances of getting through anti-bots because we're not showing up at a data center IP.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 1; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } } main();
main
on 5 pages of results.async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 5; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } }
script
tag and extract JSON from it.async function processBusiness(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url, { timeout: 60000 }); const infoSectionElement = await page.$("script[type='application/ld+json']"); const infoText = await page.evaluate(element => element.textContent, infoSectionElement); const infoSection = JSON.parse(infoText); const listElements = infoSection.itemListElement; let anonCount = 1; for (const element of listElements) { let name = element.author.name; if (name === "Unknown User") { name = `${name}${anonCount}`; anonCount++; } familyFriendly = element.isFamilyFriendly; date = element.uploadDate, position = element.position const reviewData = { name: name, family_friendly: familyFriendly, date: date, position: position } console.log(reviewData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); tries++; } finally { await page.close(); } } }
script
tag that holds our JSON, await page.$("script[type='application/ld+json']");
name
: the name of the restaurant.familyFriendly
: whether or not the restaurant is family friendly.date
: the date the review was uploaded.position
: the position that the review shows up on the page.readCsv()
function.async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;}
processResults()
function.async function processResults(csvFile, location, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const business of businesses) { await processBusiness(browser, business, location, retries) } await browser.close(); }
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url, { timeout: 60000 }); const infoSectionElement = await page.$("script[type='application/ld+json']"); const infoText = await page.evaluate(element => element.textContent, infoSectionElement); const infoSection = JSON.parse(infoText); const listElements = infoSection.itemListElement; let anonCount = 1; for (const element of listElements) { let name = element.author.name; if (name === "Unknown User") { name = `${name}${anonCount}`; anonCount++; } familyFriendly = element.isFamilyFriendly; date = element.uploadDate, position = element.position const reviewData = { name: name, family_friendly: familyFriendly, date: date, position: position } console.log(reviewData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); tries++; } finally { await page.close(); } } } async function processResults(csvFile, location, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const business of businesses) { await processBusiness(browser, business, location, retries) } await browser.close(); } async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 5; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { await processResults(file, location, retries); } console.log("Scrape complete");} main();
readCsv()
reads our CSV file into an array of JSON objects.processResults()
runs processBusiness()
on every single one of the rows from the CSV file.await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`);
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url, { timeout: 60000 }); const infoSectionElement = await page.$("script[type='application/ld+json']"); const infoText = await page.evaluate(element => element.textContent, infoSectionElement); const infoSection = JSON.parse(infoText); const listElements = infoSection.itemListElement; let anonCount = 1; for (const element of listElements) { let name = element.author.name; if (name === "Unknown User") { name = `${name}${anonCount}`; anonCount++; } familyFriendly = element.isFamilyFriendly; date = element.uploadDate, position = element.position const reviewData = { name: name, family_friendly: familyFriendly, date: date, position: position } await writeToCsv([reviewData], `${row.name.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); tries++; } finally { await page.close(); } } } async function processResults(csvFile, location, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const business of businesses) { await processBusiness(browser, business, location, retries) } await browser.close(); } async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 5; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { await processResults(file, location, retries); } console.log("Scrape complete");} main();
processResults()
function the same way we added concurrency earlier.async function processResults(csvFile, location, concurrencyLimit, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); while (businesses.length > 0) { const currentBatch = businesses.splice(0, concurrencyLimit); const tasks = currentBatch.map(business => processBusiness(browser, business, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); }
getScrapeOpsUrl()
, we simply need to add it in one more spot.await page.goto(getScrapeOpsUrl(url, location), { timeout: 60000 });
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.yelp.com/search?find_desc=${formattedKeyword}&find_loc=${location}&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-testid='serp-ia-card']"); for (const divCard of divCards) { const cardText = await page.evaluate(element => element.textContent, divCard); const img = await divCard.$("img"); const name = await page.evaluate(element => element.getAttribute("alt"), img); const nameRemoved = cardText.replace(name, ""); let sponsored = isNaN(nameRemoved[0]); let rank = 0; if (!sponsored) { rankString = nameRemoved.split("."); rank = Number(rankString[0]); } let rating = 0.0; const hasRating = await divCard.$("div span[data-font-weight='semibold']"); if (hasRating) { const ratingText = await page.evaluate(element => element.textContent, hasRating); if (ratingText.length > 0) { rating = Number(ratingText); } } let reviewCount = "0"; if (cardText.includes("review")) { reviewCount = cardText.split("(")[1].split(")")[0].split(" ")[0]; } const aElement = await divCard.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aElement); const yelpUrl = `https://www.yelp.com${link.replace("https://proxy.scrapeops.io", "")}` const searchData = { name: name, sponsored: sponsored, stars: rating, rank: rank, review_count: reviewCount, url: yelpUrl } await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(getScrapeOpsUrl(url, location), { timeout: 60000 }); const infoSectionElement = await page.$("script[type='application/ld+json']"); const infoText = await page.evaluate(element => element.textContent, infoSectionElement); const infoSection = JSON.parse(infoText); const listElements = infoSection.itemListElement; let anonCount = 1; for (const element of listElements) { let name = element.author.name; if (name === "Unknown User") { name = `${name}${anonCount}`; anonCount++; } familyFriendly = element.isFamilyFriendly; date = element.uploadDate, position = element.position const reviewData = { name: name, family_friendly: familyFriendly, date: date, position: position } await writeToCsv([reviewData], `${row.name.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); tries++; } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); while (businesses.length > 0) { const currentBatch = businesses.splice(0, concurrencyLimit); const tasks = currentBatch.map(business => processBusiness(browser, business, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); } async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 5; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log("Scrape complete");} main();
main
function. We're running on 5 pages again.async function main() { const keywords = ["restaurants"]; const concurrencyLimit = 4; const pages = 5; const location = "uk"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log("Scrape complete");}
robots.txt
here.It's typically legal to scrape data as long as it's publicly available. Public data is any data that's not gated behind a login. If you need to login to view the data, this is private data.If you have questions about the legality of a scraping job, you should consult an attorney.