Then check out ScrapeOps, the complete toolkit for web scraping.
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" rating: float = 0 text: str = "" title: str = "" date: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.find("script", id="__NEXT_DATA__") if script_tag: json_data = json.loads(script_tag.contents[0]) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["trustpilot_url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script = soup.find("script", id="__NEXT_DATA__") json_data = json.loads(script.contents[0]) business_info = json_data["props"]["pageProps"] reviews = business_info["reviews"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in reviews: review_data = ReviewData( name= review["consumer"]["displayName"], rating= review["rating"], text= review["text"], title= review["title"], date= review["dates"]["publishedDate"] ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['trustpilot_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['trustpilot_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
keyword_list or any of the following constants as well:MAX_RETRIESMAX_THREADSPAGESLOCATIONhttps://www.trustpilot.com/search?query=word1+word2
https://www.trustpilot.com/review/actual_website_domain_name
good-bank.de, the Trustpilot URL would be:https://www.trustpilot.com/review/good-bank.de
script. script holds JavaScript, and the JavaScript holds our JSON.Here is the JSON blob from good-bank.de.On both our search results, and our business pages, all the information we want is saved in a script tag with an id of "__NEXT_DATA__".https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}
https://www.trustpilot.com/search?query=online+bank&page=1
https://www.trustpilot.com/review/actual_website_domain_name
country parameter to "uk","us".country into the ScrapeOps API, ScrapeOps will actually route our requests through a server in that country, so even if the site checks our geolocation, our geolocation will show up correctly!mkdir trustpilot-scraper cd trustpilot-scraper
python -m venv venvsource venv/bin/activatepip install requestspip install beautifulsoup4while we still have retries left and the operation hasn't succeeded, we get the page and find the script tag with the id, "__NEXT_DATA__".import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.find("script", id="__NEXT_DATA__") if script_tag: json_data = json.loads(script_tag.contents[0]) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = { "name": business.get("displayName", ""), "stars": business.get("stars", 0), "rating": business.get("trustScore", 0), "num_reviews": business.get("numberOfReviews", 0), "website": business.get("contact")["website"], "trustpilot_url": f"https://www.trustpilot.com/review/{trustpilot_formatted}", "location": location.get("country", "n/a"), "category": category } logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.find("script", id="__NEXT_DATA__") if script_tag: json_data = json.loads(script_tag.contents[0]) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = { "name": business.get("displayName", ""), "stars": business.get("stars", 0), "rating": business.get("trustScore", 0), "num_reviews": business.get("numberOfReviews", 0), "website": business.get("contact")["website"], "trustpilot_url": f"https://www.trustpilot.com/review/{trustpilot_formatted}", "location": location.get("country", "n/a"), "category": category } logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
start_scrape() function which gives us the ability to scrape multiple pages. Later on, we'll add concurrency to this function, but for now, we're just going to use a for loop as a placeholder.SearchData class and a DataPipeline class.SearchData is a dataclass and the purpose of it is to simply hold our data. Once we've instantiated the SearchData, we can pass it into our DataPipeline.Take a look at the updated code.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.find("script", id="__NEXT_DATA__") if script_tag: json_data = json.loads(script_tag.contents[0]) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
DataPipeline creates a pipeline to a CSV file. If the file already exists, we append it. If it doesn't exist, we create it.SearchData gets passed into our DataPipeline, the DataPipeline filters out our duplicates and stores the rest of our relevant data to a CSV file.ThreadPoolExecutor for multithreading.Our only major difference here is the start_scrape() function. Here is what it looks like now:def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.find("script", id="__NEXT_DATA__") if script_tag: json_data = json.loads(script_tag.contents[0]) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.find("script", id="__NEXT_DATA__") if script_tag: json_data = json.loads(script_tag.contents[0]) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main. I'm changing a few constants here.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 10 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
PAGES has been set to 10 and LOCATION has been set to "us". Now let's see how long it takes to process 10 pages of data.Here are the results:We processed 10 pages in just over 4 seconds!!!def process_business(row, location, retries=3): url = row["trustpilot_url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script = soup.find("script", id="__NEXT_DATA__") json_data = json.loads(script.contents[0]) business_info = json_data["props"]["pageProps"] reviews = business_info["reviews"] for review in reviews: review_data = { "name": review["consumer"]["displayName"], "rating": review["rating"], "text": review["text"], "title": review["title"], "date": review["dates"]["publishedDate"] } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['trustpilot_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['trustpilot_url']}")
row from our CSV file and then fetches the trustpilot_url of the business.script tag with the id of "__NEXT_DATA__" to find our JSON blob.process_business() function, we need to be able to read the rows from our CSV file. Now we're going to fully update our code.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.find("script", id="__NEXT_DATA__") if script_tag: json_data = json.loads(script_tag.contents[0]) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["trustpilot_url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script = soup.find("script", id="__NEXT_DATA__") json_data = json.loads(script.contents[0]) business_info = json_data["props"]["pageProps"] reviews = business_info["reviews"] for review in reviews: review_data = { "name": review["consumer"]["displayName"], "rating": review["rating"], "text": review["text"], "title": review["title"], "date": review["dates"]["publishedDate"] } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['trustpilot_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['trustpilot_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
process_results() function reads the rows from our CSV file and passes each of them into process_business().process_business() then pulls our information and prints it to the terminal.ReviewData class. This class is going to simply hold data, just like our SearchData.We then pass our ReviewData into a DataPipeline just like we did earlier.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" rating: float = 0 text: str = "" title: str = "" date: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.find("script", id="__NEXT_DATA__") if script_tag: json_data = json.loads(script_tag.contents[0]) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["trustpilot_url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script = soup.find("script", id="__NEXT_DATA__") json_data = json.loads(script.contents[0]) business_info = json_data["props"]["pageProps"] reviews = business_info["reviews"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in reviews: review_data = ReviewData( name= review["consumer"]["displayName"], rating= review["rating"], text= review["text"], title= review["title"], date= review["dates"]["publishedDate"] ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['trustpilot_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['trustpilot_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
process_results() function refactored for conccurency.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
response = requests.get(get_scrapeops_url(url, location=location))import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" rating: float = 0 text: str = "" title: str = "" date: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.find("script", id="__NEXT_DATA__") if script_tag: json_data = json.loads(script_tag.contents[0]) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["trustpilot_url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script = soup.find("script", id="__NEXT_DATA__") json_data = json.loads(script.contents[0]) business_info = json_data["props"]["pageProps"] reviews = business_info["reviews"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in reviews: review_data = ReviewData( name= review["consumer"]["displayName"], rating= review["rating"], text= review["text"], title= review["title"], date= review["dates"]["publishedDate"] ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['trustpilot_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['trustpilot_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 10 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
PAGES to 10 and our LOCATION to "us". Here are the results.It took just over 100 seconds (including the time it took to create our initial report) to generate a full report and process all the results (86 rows). This comes out to a speed of about 1.17 seconds per business.robots.txt. You can view their robots.txt file here.Always be careful about the information you extract and don't scrape private or confidential data.Then check out ScrapeOps, the complete toolkit for web scraping.
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict from selenium import webdriverfrom selenium.webdriver.common.by import By OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" rating: float = 0 text: str = "" title: str = "" date: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): driver = webdriver.Chrome(options=OPTIONS) formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"{keyword}: Fetched page {page_number}") ## Extract Data script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") if script_tag: json_data = json.loads(script_tag.get_attribute("innerHTML")) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") driver.quit() success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["trustpilot_url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(get_scrapeops_url(url, location=location)) script = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") json_data = json.loads(script.get_attribute("innerHTML")) business_info = json_data["props"]["pageProps"] reviews = business_info["reviews"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in reviews: review_data = ReviewData( name= review["consumer"]["displayName"], rating= review["rating"], text= review["text"], title= review["title"], date= review["dates"]["publishedDate"] ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['trustpilot_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['trustpilot_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
keyword_list: This array contains the search keywords you want to use on Trustpilot to find businesses. Each keyword in the array will be used to perform a separate search and gather data on matching businesses.MAX_RETRIES: This value sets the number of retry attempts for each scraping task if it fails. More retries increase the chances of successful scraping despite intermittent errors or temporary issues but also prolong the total scraping time.MAX_THREADS: This number controls how many scraping tasks are run concurrently. A higher limit can speed up the scraping process but may increase the load on your system and the target website, potentially leading to rate limiting or bans.PAGES: This value represents the number of pages of search results you want to scrape for each keyword. Each page typically contains a set number of business listings.LOCATION: This string specifies the geographical location to use for the proxy service. It determines the country from which the scraping requests appear to originate. The location might affect the results due to regional differences in business listings and reviews.https://www.trustpilot.com/search?query=word1+word2
https://www.trustpilot.com/review/actual_website_domain_name
good-bank.de, the Trustpilot URL would be:https://www.trustpilot.com/review/good-bank.de
script. script holds JavaScript, and the JavaScript holds our JSON.Here is the JSON blob from good-bank.de.On both our search results, and our business pages, all the information we want is saved in a script tag with an id of "__NEXT_DATA__".https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}
https://www.trustpilot.com/search?query=online+bank&page=1
https://www.trustpilot.com/review/actual_website_domain_name
country parameter to "uk","us".country into the ScrapeOps API, ScrapeOps will actually route our requests through a server in that country, so even if the site checks our geolocation, our geolocation will show up correctly!mkdir trustpilot-scraper cd trustpilot-scraper
python -m venv venvsource venv/bin/activatepip install seleniumwhile we still have retries left and the operation hasn't succeeded, we get the page and find the script tag with the id, "__NEXT_DATA__".import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict from selenium import webdriverfrom selenium.webdriver.common.by import By OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): driver = webdriver.Chrome(options=OPTIONS) formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: driver.get(url) logger.info(f"{keyword}: Fetched page {page_number}") ## Extract Data script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") if script_tag: json_data = json.loads(script_tag.get_attribute("innerHTML")) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = { "name": business.get("displayName", ""), "stars": business.get("stars", 0), "rating": business.get("trustScore", 0), "num_reviews": business.get("numberOfReviews", 0), "website": business.get("contact")["website"], "trustpilot_url": f"https://www.trustpilot.com/review/{trustpilot_formatted}", "location": location.get("country", "n/a"), "category": category } print(search_data) logger.info(f"Successfully parsed data from: {url}") driver.quit() success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, page, location, data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict from selenium import webdriverfrom selenium.webdriver.common.by import By OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): driver = webdriver.Chrome(options=OPTIONS) formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: driver.get(url) logger.info(f"{keyword}: Fetched page {page_number}") ## Extract Data script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") if script_tag: json_data = json.loads(script_tag.get_attribute("innerHTML")) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = { "name": business.get("displayName", ""), "stars": business.get("stars", 0), "rating": business.get("trustScore", 0), "num_reviews": business.get("numberOfReviews", 0), "website": business.get("contact")["website"], "trustpilot_url": f"https://www.trustpilot.com/review/{trustpilot_formatted}", "location": location.get("country", "n/a"), "category": category } print(search_data) logger.info(f"Successfully parsed data from: {url}") driver.quit() success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, page, location, data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
startScrape() function which gives us the ability to scrape multiple pages. Later on, we'll add concurrency to this function, but for now, we're just going to use a for loop as a placeholder.We take in a range() of pages and then we go though and run scrape_search_results() on each page.SearchData class and a DataPipeline class.SearchData is a dataclass and the purpose of it is to simply hold our data. Once we've instantiated the SearchData, we can pass it into our DataPipeline.Take a look at the updated code.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict from selenium import webdriverfrom selenium.webdriver.common.by import By OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): driver = webdriver.Chrome(options=OPTIONS) formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: driver.get(url) logger.info(f"{keyword}: Fetched page {page_number}") ## Extract Data script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") if script_tag: json_data = json.loads(script_tag.get_attribute("innerHTML")) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") driver.quit() success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, page, location, data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
DataPipeline creates a pipeline to a CSV file. If the file already exists, we append it. If it doesn't exist, we create it.SearchData gets passed into our DataPipeline, the DataPipeline filters out our duplicates and stores the rest of our relevant data to a CSV file.ThreadPoolExecutor for multithreading.Our only major difference here is the start_scrape() function. Here is what it looks like now:def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict from selenium import webdriverfrom selenium.webdriver.common.by import By OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): driver = webdriver.Chrome(options=OPTIONS) formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: driver.get(url) logger.info(f"{keyword}: Fetched page {page_number}") ## Extract Data script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") if script_tag: json_data = json.loads(script_tag.get_attribute("innerHTML")) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") driver.quit() success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict from selenium import webdriverfrom selenium.webdriver.common.by import By OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): driver = webdriver.Chrome(options=OPTIONS) formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: driver.get(url) logger.info(f"{keyword}: Fetched page {page_number}") ## Extract Data script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") if script_tag: json_data = json.loads(script_tag.get_attribute("innerHTML")) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") driver.quit() success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
main. I'm changing a few constants here.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 10 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
PAGES has been set to 10 and LOCATION has been set to "us". Now let's see how long it takes to process 10 pages of data.Here are the results:We processed 10 pages in roughly 21 seconds. All in all, it costs us about 2.1 seconds per page!def process_business(row, location, retries=3): url = row["trustpilot_url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) script = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") json_data = json.loads(script.get_attribute("innerHTML")) business_info = json_data["props"]["pageProps"] reviews = business_info["reviews"] for review in reviews: review_data = { "name": review["consumer"]["displayName"], "rating": review["rating"], "text": review["text"], "title": review["title"], "date": review["dates"]["publishedDate"] } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['trustpilot_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['trustpilot_url']}")
row from our CSV file and then fetches the trustpilot_url of the business.script tag with the id of "__NEXT_DATA__" to find our JSON blob.process_business() function, we need to be able to read the rows from our CSV file. Now we're going to fully update our code so we can actually read information from the CSV.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict from selenium import webdriverfrom selenium.webdriver.common.by import By OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): driver = webdriver.Chrome(options=OPTIONS) formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: driver.get(url) logger.info(f"{keyword}: Fetched page {page_number}") ## Extract Data script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") if script_tag: json_data = json.loads(script_tag.get_attribute("innerHTML")) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") driver.quit() success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["trustpilot_url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) script = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") json_data = json.loads(script.get_attribute("innerHTML")) business_info = json_data["props"]["pageProps"] reviews = business_info["reviews"] for review in reviews: review_data = { "name": review["consumer"]["displayName"], "rating": review["rating"], "text": review["text"], "title": review["title"], "date": review["dates"]["publishedDate"] } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['trustpilot_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['trustpilot_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
process_results() function reads the rows from our CSV file and passes each of them into process_business(). process_business() then pulls our information and prints it to the terminal.ReviewData class. This class is going to simply hold data, just like our SearchData.We then pass our ReviewData into a DataPipeline just like we did earlier.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict from selenium import webdriverfrom selenium.webdriver.common.by import By OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" rating: float = 0 text: str = "" title: str = "" date: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): driver = webdriver.Chrome(options=OPTIONS) formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location=location)) logger.info(f"{keyword}: Fetched page {page_number}") ## Extract Data script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") if script_tag: json_data = json.loads(script_tag.get_attribute("innerHTML")) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") driver.quit() success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["trustpilot_url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url, location=location) script = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") json_data = json.loads(script.get_attribute("innerHTML")) business_info = json_data["props"]["pageProps"] reviews = business_info["reviews"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in reviews: review_data = ReviewData( name= review["consumer"]["displayName"], rating= review["rating"], text= review["text"], title= review["title"], date= review["dates"]["publishedDate"] ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['trustpilot_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['trustpilot_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
process_results() function refactored for conccurency.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
scrapeops_proxy_url = get_scrapeops_url(url, location=location)driver.get(scrapeops_proxy_url)
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict from selenium import webdriverfrom selenium.webdriver.common.by import By OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" trustpilot_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" rating: float = 0 text: str = "" title: str = "" date: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): driver = webdriver.Chrome(options=OPTIONS) formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"{keyword}: Fetched page {page_number}") ## Extract Data script_tag = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") if script_tag: json_data = json.loads(script_tag.get_attribute("innerHTML")) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") driver.quit() success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["trustpilot_url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(get_scrapeops_url(url, location=location)) script = driver.find_element(By.CSS_SELECTOR, "script[id='__NEXT_DATA__'") json_data = json.loads(script.get_attribute("innerHTML")) business_info = json_data["props"]["pageProps"] reviews = business_info["reviews"] review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in reviews: review_data = ReviewData( name= review["consumer"]["displayName"], rating= review["rating"], text= review["text"], title= review["title"], date= review["dates"]["publishedDate"] ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['trustpilot_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['trustpilot_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 10 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
PAGES to 10 and our LOCATION to "us". Here are the results.It took just over 247 seconds (including the time it took to create our initial report) to generate a full report and process all the results (86 rows). This comes out to a speed of about 2.87 seconds per business.robots.txt. You can view their robots.txt file here.Always be careful about the information you extract and don't scrape private or confidential data. If a website is hidden behind a login, that is generally considered private data. If your data does not require a login, it is generally considered to be public data. If you have questions about the legality of your scraping job, it is best to consult an attorney familiar with the laws and localities you're dealing with.Then check out ScrapeOps, the complete toolkit for web scraping.
const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); try { const url = `https://www.trustpilot.com/search?query=${formattedKeyword}&page=${pageNumber + 1}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessUnits = jsonData.props.pageProps.businessUnits; for (const business of businessUnits) { let category = 'n/a'; if ('categories' in business && business.categories.length > 0) { category = business.categories[0].categoryId; } let location = 'n/a'; if ('location' in business && 'country' in business.location) { location = business.location.country; } const trustpilotFormatted = business.contact.website.split('://')[1]; const businessInfo = { name: business.displayName .toLowerCase() .replace(' ', '') .replace("'", ''), stars: business.stars, rating: business.trustScore, num_reviews: business.numberOfReviews, website: business.contact.website, trustpilot_url: `https://www.trustpilot.com/review/${trustpilotFormatted}`, location: location, category: category, }; await writeToCsv([businessInfo], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape( keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => scrapeSearchResults(browser, keyword, page, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.trustpilot_url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(getScrapeOpsUrl(url, location)); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessInfo = jsonData.props.pageProps; const reviews = businessInfo.reviews; for (const review of reviews) { const reviewData = { name: review.consumer.displayName, rating: review.rating, text: review.text, title: review.title, date: review.dates.publishedDate, }; await writeToCsv([reviewData], `${row.name}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries - tries}`); tries++; } finally { await page.close(); } }} async function processResults(csvFile, location, concurrencyLimit, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); while (businesses.length > 0) { const currentBatch = businesses.splice(0, concurrencyLimit); const tasks = currentBatch.map((business) => processBusiness(browser, business, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ['online bank']; const concurrencyLimit = 5; const pages = 1; const location = 'us'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { await startScrape(keyword, pages, location, concurrencyLimit, retries); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); } for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); }} main();
keywords: This array contains the search keywords you want to use on Trustpilot to find businesses. Each keyword in the array will be used to perform a separate search and gather data on matching businesses.concurrencyLimit: This number controls how many scraping tasks are run concurrently. A higher limit can speed up the scraping process but may increase the load on your system and the target website, potentially leading to rate limiting or bans.pages: This value represents the number of pages of search results you want to scrape for each keyword. Each page typically contains a set number of business listings.location: This string specifies the geographical location to use for the proxy service. It determines the country from which the scraping requests appear to originate. The location might affect the results due to regional differences in business listings and reviews.retries: This value sets the number of retry attempts for each scraping task if it fails. More retries increase the chances of successful scraping despite intermittent errors or temporary issues, but they also prolong the total scraping time.https://www.trustpilot.com/search?query=word1+word2
https://www.trustpilot.com/review/actual_website_domain_name
good-bank.de, the Trustpilot URL would be:https://www.trustpilot.com/review/good-bank.de
script. script holds JavaScript, and the JavaScript holds our JSON.Here is the JSON blob from good-bank.de.On both our search results, and our business pages, all the information we want is saved in a script tag with an id of "__NEXT_DATA__". All we have to do is pull this JSON from the page and then parse it.https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}
https://www.trustpilot.com/search?query=online+bank&page=1
https://www.trustpilot.com/review/actual_website_domain_name
country parameter to "uk","us".country into the ScrapeOps API, ScrapeOps will actually route our requests through a server in that country, so even if the site checks our geolocation, our geolocation will show up correctly!mkdir trustpilot-scraper cd trustpilot-scraper
npm init --ynpm install puppeteernpm install csv-writernpm install csv-parsenpm install fswhile we still have retries left and the operation hasn't succeeded, we get the page and find the script tag with the id, "__NEXT_DATA__".const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function scrapeSearchResults( browser, keyword, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); try { const url = `https://www.trustpilot.com/search?query=${formattedKeyword}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessUnits = jsonData.props.pageProps.businessUnits; for (const business of businessUnits) { let category = 'n/a'; if ('categories' in business && business.categories.length > 0) { category = business.categories[0].categoryId; } let location = 'n/a'; if ('location' in business && 'country' in business.location) { location = business.location.country; } const trustpilotFormatted = business.contact.website.split('://')[1]; const businessInfo = { name: business.displayName .toLowerCase() .replace(' ', '') .replace("'", ''), stars: business.stars, rating: business.trustScore, num_reviews: business.numberOfReviews, website: business.contact.website, trustpilot_url: `https://www.trustpilot.com/review/${trustpilotFormatted}`, location: location, category: category, }; console.log(businessInfo); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function main() { const keywords = ['online bank']; const location = 'us'; const retries = 3; for (const keyword of keywords) { const browser = await puppeteer.launch(); await scrapeSearchResults(browser, keyword, location, retries); await browser.close(); }} main();
https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}
const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); try { const url = `https://www.trustpilot.com/search?query=${formattedKeyword}&page=${pageNumber + 1}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessUnits = jsonData.props.pageProps.businessUnits; for (const business of businessUnits) { let category = 'n/a'; if ('categories' in business && business.categories.length > 0) { category = business.categories[0].categoryId; } let location = 'n/a'; if ('location' in business && 'country' in business.location) { location = business.location.country; } const trustpilotFormatted = business.contact.website.split('://')[1]; const businessInfo = { name: business.displayName .toLowerCase() .replace(' ', '') .replace("'", ''), stars: business.stars, rating: business.trustScore, num_reviews: business.numberOfReviews, website: business.contact.website, trustpilot_url: `https://www.trustpilot.com/review/${trustpilotFormatted}`, location: location, category: category, }; console.log(businessInfo); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, location, retries); } await browser.close();} async function main() { const keywords = ['online bank']; const concurrencyLimit = 5; const pages = 1; const location = 'us'; const retries = 3; for (const keyword of keywords) { await startScrape(keyword, pages, location, concurrencyLimit, retries); }} main();
startScrape() function which gives us the ability to scrape multiple pages. Later on, we'll add concurrency to this function, but for now, we're just going to use a for loop as a placeholder.writeToCsv() function. This function takes data (an array of JSON objects) and an outputFile.outputFile exists, we append it.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); try { const url = `https://www.trustpilot.com/search?query=${formattedKeyword}&page=${pageNumber + 1}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessUnits = jsonData.props.pageProps.businessUnits; for (const business of businessUnits) { let category = 'n/a'; if ('categories' in business && business.categories.length > 0) { category = business.categories[0].categoryId; } let location = 'n/a'; if ('location' in business && 'country' in business.location) { location = business.location.country; } const trustpilotFormatted = business.contact.website.split('://')[1]; const businessInfo = { name: business.displayName .toLowerCase() .replace(' ', '') .replace("'", ''), stars: business.stars, rating: business.trustScore, num_reviews: business.numberOfReviews, website: business.contact.website, trustpilot_url: `https://www.trustpilot.com/review/${trustpilotFormatted}`, location: location, category: category, }; await writeToCsv([businessInfo], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, location, retries); } await browser.close();} async function main() { const keywords = ['online bank']; const concurrencyLimit = 5; const pages = 1; const location = 'us'; const retries = 3; for (const keyword of keywords) { await startScrape(keyword, pages, location, concurrencyLimit, retries); }} main();
businessInfo holds all the information that we scraped.businessInfo, we simply write it to CSV with await writeToCsv([businessInfo], `${keyword.replace(" ", "-")}.csv`).async programming and batching in order to process batches of multiple results simultaneously.We use a concurrencyLimit to determine our batch size. While we still have pages to scrape, we splice() out a batch and process it. Once that batch has finished, we move onto the next one.Our only major difference here is the startScrape() function. Here is what it looks like now:async function startScrape( keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => scrapeSearchResults(browser, keyword, page, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();}
const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); try { const url = `https://www.trustpilot.com/search?query=${formattedKeyword}&page=${pageNumber + 1}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessUnits = jsonData.props.pageProps.businessUnits; for (const business of businessUnits) { let category = 'n/a'; if ('categories' in business && business.categories.length > 0) { category = business.categories[0].categoryId; } let location = 'n/a'; if ('location' in business && 'country' in business.location) { location = business.location.country; } const trustpilotFormatted = business.contact.website.split('://')[1]; const businessInfo = { name: business.displayName .toLowerCase() .replace(' ', '') .replace("'", ''), stars: business.stars, rating: business.trustScore, num_reviews: business.numberOfReviews, website: business.contact.website, trustpilot_url: `https://www.trustpilot.com/review/${trustpilotFormatted}`, location: location, category: category, }; await writeToCsv([businessInfo], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape( keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => scrapeSearchResults(browser, keyword, page, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ['online bank']; const concurrencyLimit = 5; const pages = 1; const location = 'us'; const retries = 3; for (const keyword of keywords) { await startScrape(keyword, pages, location, concurrencyLimit, retries); }} main();
function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}
getScrapeOpsUrl() takes in all of our parameters and uses simple string formatting to return the proxyUrl that we're going to be using.In this example, our code barely changes at all, but it brings us to a production ready level. Take a look at the full code example below.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); try { const url = `https://www.trustpilot.com/search?query=${formattedKeyword}&page=${pageNumber + 1}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessUnits = jsonData.props.pageProps.businessUnits; for (const business of businessUnits) { let category = 'n/a'; if ('categories' in business && business.categories.length > 0) { category = business.categories[0].categoryId; } let location = 'n/a'; if ('location' in business && 'country' in business.location) { location = business.location.country; } const trustpilotFormatted = business.contact.website.split('://')[1]; const businessInfo = { name: business.displayName .toLowerCase() .replace(' ', '') .replace("'", ''), stars: business.stars, rating: business.trustScore, num_reviews: business.numberOfReviews, website: business.contact.website, trustpilot_url: `https://www.trustpilot.com/review/${trustpilotFormatted}`, location: location, category: category, }; await writeToCsv([businessInfo], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape( keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => scrapeSearchResults(browser, keyword, page, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ['online bank']; const concurrencyLimit = 5; const pages = 1; const location = 'us'; const retries = 3; for (const keyword of keywords) { await startScrape(keyword, pages, location, concurrencyLimit, retries); }} main();
main. I'm changing a few constants here.async function main() { const keywords = ['online bank']; const concurrencyLimit = 5; const pages = 10; const location = 'us'; const retries = 3; for (const keyword of keywords) { await startScrape(keyword, pages, location, concurrencyLimit, retries); }}
pages has been set to 10 and location has been set to "us". Now let's see how long it takes to process 10 pages of data.Here are the results:We processed 10 pages in roughly 9.3 seconds. All in all, it costs us less than a second per page!async function processBusiness(browser, row, location, retries = 3) { const url = row.trustpilot_url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url, location); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessInfo = jsonData.props.pageProps; const reviews = businessInfo.reviews; for (const review of reviews) { const reviewData = { name: review.consumer.displayName, rating: review.rating, text: review.text, title: review.title, date: review.dates.publishedDate, }; console.log(reviewData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries - tries}`); tries++; } finally { await page.close(); } }}
row from our CSV file and then fetches the trustpilot_url of the business.script tag with the id of "__NEXT_DATA__" to find our JSON blob.processBusiness() function, we need to be able to read the rows from our CSV file. Now we're going to fully update our code.In the example below, we also add a processResults() function. processResults() reads the CSV report from our crawler and then processes each business from the report.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); try { const url = `https://www.trustpilot.com/search?query=${formattedKeyword}&page=${pageNumber + 1}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessUnits = jsonData.props.pageProps.businessUnits; for (const business of businessUnits) { let category = 'n/a'; if ('categories' in business && business.categories.length > 0) { category = business.categories[0].categoryId; } let location = 'n/a'; if ('location' in business && 'country' in business.location) { location = business.location.country; } const trustpilotFormatted = business.contact.website.split('://')[1]; const businessInfo = { name: business.displayName .toLowerCase() .replace(' ', '') .replace("'", ''), stars: business.stars, rating: business.trustScore, num_reviews: business.numberOfReviews, website: business.contact.website, trustpilot_url: `https://www.trustpilot.com/review/${trustpilotFormatted}`, location: location, category: category, }; await writeToCsv([businessInfo], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape( keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => scrapeSearchResults(browser, keyword, page, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.trustpilot_url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url, location); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessInfo = jsonData.props.pageProps; const reviews = businessInfo.reviews; for (const review of reviews) { const reviewData = { name: review.consumer.displayName, rating: review.rating, text: review.text, title: review.title, date: review.dates.publishedDate, }; console.log(reviewData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries - tries}`); tries++; } finally { await page.close(); } }} async function processResults(csvFile, location, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const business of businesses) { await processBusiness(browser, business, location, retries); } await browser.close();} async function main() { const keywords = ['online bank']; const concurrencyLimit = 5; const pages = 1; const location = 'us'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { await startScrape(keyword, pages, location, concurrencyLimit, retries); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); } for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); }} main();
process_results() function reads the rows from our CSV file and passes each of them into process_business().process_business() then pulls our information and prints it to the terminal.businessInfo object we used earlier, we now use a reviewData object and then we pass it into the writeToCsv() function again.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); try { const url = `https://www.trustpilot.com/search?query=${formattedKeyword}&page=${pageNumber + 1}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessUnits = jsonData.props.pageProps.businessUnits; for (const business of businessUnits) { let category = 'n/a'; if ('categories' in business && business.categories.length > 0) { category = business.categories[0].categoryId; } let location = 'n/a'; if ('location' in business && 'country' in business.location) { location = business.location.country; } const trustpilotFormatted = business.contact.website.split('://')[1]; const businessInfo = { name: business.displayName .toLowerCase() .replace(' ', '') .replace("'", ''), stars: business.stars, rating: business.trustScore, num_reviews: business.numberOfReviews, website: business.contact.website, trustpilot_url: `https://www.trustpilot.com/review/${trustpilotFormatted}`, location: location, category: category, }; await writeToCsv([businessInfo], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape( keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => scrapeSearchResults(browser, keyword, page, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.trustpilot_url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url, location); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessInfo = jsonData.props.pageProps; const reviews = businessInfo.reviews; for (const review of reviews) { const reviewData = { name: review.consumer.displayName, rating: review.rating, text: review.text, title: review.title, date: review.dates.publishedDate, }; await writeToCsv([reviewData], `${row.name}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries - tries}`); tries++; } finally { await page.close(); } }} async function processResults(csvFile, location, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const business of businesses) { await processBusiness(browser, business, location, retries); } await browser.close();} async function main() { const keywords = ['online bank']; const concurrencyLimit = 5; const pages = 1; const location = 'us'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { await startScrape(keyword, pages, location, concurrencyLimit, retries); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); } for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); }} main();
writeToCsv() as soon as it has been processed. This allows us to store our data efficiently, but also write the absolute most possible data in the even of a crash.processResults() function refactored for concurrency.async function processResults(csvFile, location, concurrencyLimit, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); while (businesses.length > 0) { const currentBatch = businesses.splice(0, concurrencyLimit); const tasks = currentBatch.map((business) => processBusiness(browser, business, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();}
await page.goto(getScrapeOpsUrl(url, location));const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i = start; i < end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); try { const url = `https://www.trustpilot.com/search?query=${formattedKeyword}&page=${pageNumber + 1}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessUnits = jsonData.props.pageProps.businessUnits; for (const business of businessUnits) { let category = 'n/a'; if ('categories' in business && business.categories.length > 0) { category = business.categories[0].categoryId; } let location = 'n/a'; if ('location' in business && 'country' in business.location) { location = business.location.country; } const trustpilotFormatted = business.contact.website.split('://')[1]; const businessInfo = { name: business.displayName .toLowerCase() .replace(' ', '') .replace("'", ''), stars: business.stars, rating: business.trustScore, num_reviews: business.numberOfReviews, website: business.contact.website, trustpilot_url: `https://www.trustpilot.com/review/${trustpilotFormatted}`, location: location, category: category, }; await writeToCsv([businessInfo], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape( keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map((page) => scrapeSearchResults(browser, keyword, page, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.trustpilot_url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(getScrapeOpsUrl(url, location)); const script = await page.$("script[id='__NEXT_DATA__']"); const innerHTML = await page.evaluate( (element) => element.innerHTML, script ); const jsonData = JSON.parse(innerHTML); const businessInfo = jsonData.props.pageProps; const reviews = businessInfo.reviews; for (const review of reviews) { const reviewData = { name: review.consumer.displayName, rating: review.rating, text: review.text, title: review.title, date: review.dates.publishedDate, }; await writeToCsv([reviewData], `${row.name}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries - tries}`); tries++; } finally { await page.close(); } }} async function processResults(csvFile, location, concurrencyLimit, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); while (businesses.length > 0) { const currentBatch = businesses.splice(0, concurrencyLimit); const tasks = currentBatch.map((business) => processBusiness(browser, business, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ['online bank']; const concurrencyLimit = 5; const pages = 1; const location = 'us'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { await startScrape(keyword, pages, location, concurrencyLimit, retries); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); } for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); }} main();
main.async function main() { const keywords = ['online bank']; const concurrencyLimit = 5; const pages = 10; const location = 'us'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { await startScrape(keyword, pages, location, concurrencyLimit, retries); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); } for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); }}
PAGES to 10 and our LOCATION to "us". Here are the results.It took just over 121 seconds (including the time it took to create our initial report) to generate a full report and process all the results (86 rows).This comes out to a speed of about 1.41 seconds per business.robots.txt. You can view their robots.txt file here.Always be careful about the information you extract and don't scrape private or confidential data.