Then check out ScrapeOps, the complete toolkit for web scraping.
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" job_title: str = "" rating: float = 0 full_review: str = "" review_source: str = "" validated: bool = False incentivized: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom") for div_card in div_cards: name = div_card.find("div", class_="product-listing__product-name") g2_url = name.find("a").get("href") has_rating = div_card.find("span", class_="fw-semibold") rating = 0.0 if has_rating: rating = has_rating.text description = div_card.find("p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["g2_url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.find_all("div", class_="paper paper--white paper--box mb-2 position-relative border-bottom") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") anon_count = 0 for review_card in review_cards: review_date = review_card.find("time") if review_date: date = review_date.get("datetime") name_present = review_card.find("a", class_="link--header-color") name = name_present.text if name_present else "anonymous" if name == "anonymous": name = f"{name}-{anon_count}" anon_count += 1 job_title_present = review_card.find("div", class_="mt-4th") job_title = job_title_present.text if job_title_present else "n/a" rating_container = review_card.find("div", class_="f-1 d-f ai-c mb-half-small-only") rating_div = rating_container.find("div") rating_class = rating_div.get("class") stars_string = rating_class[-1] stars_large_number = float(stars_string.split("-")[-1]) stars_clean_number = stars_large_number/2 review_body = review_card.find("div", attrs={"itemprop": "reviewBody"}).text info_container = review_card.find("div", class_="tags--teal") incentives_dirty = info_container.find_all("div") incentives_clean = [] source = "" for incentive in incentives_dirty: if incentive.text not in incentives_clean: if "Review source:" in incentive.text: source = incentive.text.split(": ")[-1] else: incentives_clean.append(incentive.text) validated = "Validated Reviewer" in incentives_clean incentivized = "Incentivized Review" in incentives_clean review_data = ReviewData( name=name, date=date, job_title=job_title, rating=stars_clean_number, full_review=review_body, review_source=source, validated=validated, incentivized=incentivized ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['g2_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['g2_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
keyword_list
: Contains a list of keywords to be searched and scraped.MAX_RETRIES
: Specifies the number of times the scraper will retry fetching a page if it encounters an error.MAX_THREADS
: Defines the maximum number of threads to be used for concurrent scraping.PAGES
: Specifies the number of pages to scrape for each keyword.LOCATION
: Defines the geographic location from which the scraping requests appear to originate.https://www.g2.com/search?query=online+bank`
https://www.g2.com/search?
holds the first part of our url and the query is tacked onto the end: query=online+bank
. Additional parameters can be added to the url with &
.Take a look at the search below for online bank.https://www.g2.com/products/name-of-business/reviews
name
of a business nested within the page. All in all the results page isn't too difficult.stars-8
at the end of the class name.8
is actually our rating but doubled.stars-10
would be a 5 star rating. stars-9
would be 4.5 stars, stars-8
, is 4 stars and so on and so forth.page
parameter. Our URL updated for pagination will look like this:https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}
https://www.g2.com/products/name-of-business/reviews
country
parameter to "uk"
, if we want to be in the US, we can set this param to "us"
.When we pass our country
into the ScrapeOps API, ScrapeOps will actually route our requests through a server in that country, so even if the site checks our geolocation, our geolocation will show up correctly!mkdir g2-scraper cd g2-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
while
we still have retries
left and the operation hasn't succeeded:
requests.get(url)
fetches the siteif
we get a status_code
of 200, we've got a successful response, if we get any other status_code
, we raise
an Exception
name
with div_card.find("div", class_="product-listing__product-name")
name.find("a").get("href")
gets the link to the business, g2_url
rating
present on the page, we pull it from the page with has_rating.text
. If there is no rating present, we give it a default rating of 0.0div_card.find("p").text
gives us the description of the businessimport osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom") for div_card in div_cards: name = div_card.find("div", class_="product-listing__product-name") g2_url = name.find("a").get("href") has_rating = div_card.find("span", class_="fw-semibold") rating = 0.0 if has_rating: rating = has_rating.text description = div_card.find("p").text search_data = { "name": name.text, "stars": rating, "g2_url": g2_url, "description": description } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: scrape_search_results(keyword, location, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
name
, stars
, g2_url
, and description
.With this information we can create uniform objects representing each business from the page.Later on, this information goes a long way when generating our crawler report.https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}
page_number+1
because start_scrape()
uses a for
loop that starts counting at zero.Take a look at the updated code below:import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom") for div_card in div_cards: name = div_card.find("div", class_="product-listing__product-name") g2_url = name.find("a").get("href") has_rating = div_card.find("span", class_="fw-semibold") rating = 0.0 if has_rating: rating = has_rating.text description = div_card.find("p").text search_data = { "name": name.text, "stars": rating, "g2_url": g2_url, "description": description } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
page_number
to scrape_search_results()
. We also added a start_scrape()
function which gives us the ability to scrape multiple pages.Later on, we'll add concurrency to this function, but for now, we're just going to use a for
loop as a placeholder.SearchData
and DataPipeline
.While they might look a bit intimidating, these classes are relatively simple.SearchData
represents individual businesses.DataPipeline
takes SearchData
as input.DataPipeline
takes in the SearchData
, it compares each object by its name
. If two objects have the same name
, the second one gets dropped from the report. This simple approach goes a long way when filtering out duplicates.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom") for div_card in div_cards: name = div_card.find("div", class_="product-listing__product-name") g2_url = name.find("a").get("href") has_rating = div_card.find("span", class_="fw-semibold") rating = 0.0 if has_rating: rating = has_rating.text description = div_card.find("p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
DataPipeline
creates a pipeline to a CSV file and filters out duplicates on the way to the fileSearchData
is used to represent business objects to put into the pipelineThreadPoolExecutor
to implement multithreading and crawl multiple pages simultaneously.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map()
scrape_search_results
tells the executor to run this function on each of thread[keyword] * pages
passes our keyword
into executor.map()
as a list
list
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom") for div_card in div_cards: name = div_card.find("div", class_="product-listing__product-name") g2_url = name.find("a").get("href") has_rating = div_card.find("span", class_="fw-semibold") rating = 0.0 if has_rating: rating = has_rating.text description = div_card.find("p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
country
of choice.Each request we make is coming from a different IP address, so instead of looking like one really abnormal user, our crawler looks like a bunch of different normal users.In this example, our code barely changes at all, but it brings us to a production ready level. Take a look at the full code example below.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom") for div_card in div_cards: name = div_card.find("div", class_="product-listing__product-name") g2_url = name.find("a").get("href") has_rating = div_card.find("span", class_="fw-semibold") rating = 0.0 if has_rating: rating = has_rating.text description = div_card.find("p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
below, we're going to scrape 10 pages.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 10 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
PAGES
has been set to 10
and LOCATION
has been set to "us"
. Now let's see how long it takes to process 10 pages of data.Here are the results:def process_business(row, location, retries=3): url = row["g2_url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.find_all("div", class_="paper paper--white paper--box mb-2 position-relative border-bottom") anon_count = 0 for review_card in review_cards: review_date = review_card.find("time") if review_date: date = review_date.get("datetime") name_present = review_card.find("a", class_="link--header-color") name = name_present.text if name_present else "anonymous" if name == "anonymous": name = f"{name}-{anon_count}" anon_count += 1 job_title_present = review_card.find("div", class_="mt-4th") job_title = job_title_present.text if job_title_present else "n/a" rating_container = review_card.find("div", class_="f-1 d-f ai-c mb-half-small-only") rating_div = rating_container.find("div") rating_class = rating_div.get("class") stars_string = rating_class[-1] stars_large_number = float(stars_string.split("-")[-1]) stars_clean_number = stars_large_number/2 review_body = review_card.find("div", attrs={"itemprop": "reviewBody"}).text info_container = review_card.find("div", class_="tags--teal") incentives_dirty = info_container.find_all("div") incentives_clean = [] source = "" for incentive in incentives_dirty: if incentive.text not in incentives_clean: if "Review source:" in incentive.text: source = incentive.text.split(": ")[-1] else: incentives_clean.append(incentive.text) validated = "Validated Reviewer" in incentives_clean incentivized = "Incentivized Review" in incentives_clean review_data = { "name": name, "date": date, "job_title": job_title, "rating": stars_clean_number, "full_review": review_body, "review_source": source, "validated": validated, "incentivized": incentivized } print("Review Data:", review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['g2_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['g2_url']}")
date
, in each review, we pull the date
with date = review_date.get("datetime")
"anonymous"
and give them a number. This prevents different anonymous reviews from getting filtered outjob_title_present = review_card.find("div", class_="mt-4th")
checks if the job_title
is present. If it is not, we give it a default value of "n/a"
. Otherwise we pull the user's job_title
from the post.rating_div.get("class")
gets us the CSS class of the rating. We then split("-")
to separate the number of stars from the CSS class. After splitting the stars, we divide them by 2 to get the actual rating.review_card.find("div", attrs={"itemprop": "reviewBody"}).text
gives us the actual reviewincentives_dirty
list to hold all of the incentive tags from the review. If "Review source:"
is in the text of the incentive item, we split(": ")
to separate the source name and pull it. All other non duplicate items get pushed into the incentives_clean
list."Validated Reviewer"
or "Incentivized Review"
is inside the incentives_clean
list, we set those variables to True
row
from our CSV file and then fetches the g2_url
of the business. Once we can get the proper information from the site we're ready to start reading our CSV file and scraping this valuable data.process_business()
function, we need to be able to read the rows from our CSV file. Now we're going to fully update our code.Take a look at the function below:def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries)
process_business()
. You can view the fully updated code below.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 rating: float = 0 num_reviews: int = 0 website: str = "" g2_url: str = "" location: str = "" category: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.find("script", id="__NEXT_DATA__") if script_tag: json_data = json.loads(script_tag.contents[0]) business_units = json_data["props"]["pageProps"]["businessUnits"] for business in business_units: name = business.get("displayName").lower().replace(" ", "").replace("'", "") trustpilot_formatted = business.get("contact")["website"].split("://")[1] location = business.get("location") category_list = business.get("categories") category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a" ## Extract Data search_data = SearchData( name = business.get("displayName", ""), stars = business.get("stars", 0), rating = business.get("trustScore", 0), num_reviews = business.get("numberOfReviews", 0), website = business.get("contact")["website"], g2_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}", location = location.get("country", "n/a"), category = category ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["g2_url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script = soup.find("script", id="__NEXT_DATA__") json_data = json.loads(script.contents[0]) business_info = json_data["props"]["pageProps"] reviews = business_info["reviews"] for review in reviews: review_data = { "name": review["consumer"]["displayName"], "rating": review["rating"], "text": review["text"], "title": review["title"], "date": review["dates"]["publishedDate"] } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['g2_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['g2_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
process_results()
function reads the rows from our CSV file and passes each of them into process_business()
. process_business()
then pulls our information and prints it to the terminal.DataPipeline
is already able to do this, we just need another @dataclass
.Take a look at the snippet below, it's our ReviewData
.@dataclassclass ReviewData: name: str = "" date: str = "" job_title: str = "" rating: float = 0 full_review: str = "" review_source: str = "" validated: bool = False incentivized: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
ReviewData
holds the following fields:name: str
date: str
job_title: str
rating: float
full_review: str
review_source: str
validated: bool
incentivized: bool
DataPipeline
and pass our ReviewData
object into it.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" job_title: str = "" rating: float = 0 full_review: str = "" review_source: str = "" validated: bool = False incentivized: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom") for div_card in div_cards: name = div_card.find("div", class_="product-listing__product-name") g2_url = name.find("a").get("href") has_rating = div_card.find("span", class_="fw-semibold") rating = 0.0 if has_rating: rating = has_rating.text description = div_card.find("p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["g2_url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.find_all("div", class_="paper paper--white paper--box mb-2 position-relative border-bottom") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") anon_count = 0 for review_card in review_cards: review_date = review_card.find("time") if review_date: date = review_date.get("datetime") name_present = review_card.find("a", class_="link--header-color") name = name_present.text if name_present else "anonymous" if name == "anonymous": name = f"{name}-{anon_count}" anon_count += 1 job_title_present = review_card.find("div", class_="mt-4th") job_title = job_title_present.text if job_title_present else "n/a" rating_container = review_card.find("div", class_="f-1 d-f ai-c mb-half-small-only") rating_div = rating_container.find("div") rating_class = rating_div.get("class") stars_string = rating_class[-1] stars_large_number = float(stars_string.split("-")[-1]) stars_clean_number = stars_large_number/2 review_body = review_card.find("div", attrs={"itemprop": "reviewBody"}).text info_container = review_card.find("div", class_="tags--teal") incentives_dirty = info_container.find_all("div") incentives_clean = [] source = "" for incentive in incentives_dirty: if incentive.text not in incentives_clean: if "Review source:" in incentive.text: source = incentive.text.split(": ")[-1] else: incentives_clean.append(incentive.text) validated = "Validated Reviewer" in incentives_clean incentivized = "Incentivized Review" in incentives_clean review_data = ReviewData( name=name, date=date, job_title=job_title, rating=stars_clean_number, full_review=review_body, review_source=source, validated=validated, incentivized=incentivized ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['g2_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['g2_url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
process_business()
on multiple businesses at the same time.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
get_scrapeops_url()
function so we just need to place it into our script.response = requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" job_title: str = "" rating: float = 0 full_review: str = "" review_source: str = "" validated: bool = False incentivized: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom") for div_card in div_cards: name = div_card.find("div", class_="product-listing__product-name") g2_url = name.find("a").get("href") has_rating = div_card.find("span", class_="fw-semibold") rating = 0.0 if has_rating: rating = has_rating.text description = div_card.find("p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["g2_url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.find_all("div", class_="paper paper--white paper--box mb-2 position-relative border-bottom") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") anon_count = 0 for review_card in review_cards: review_date = review_card.find("time") if review_date: date = review_date.get("datetime") name_present = review_card.find("a", class_="link--header-color") name = name_present.text if name_present else "anonymous" if name == "anonymous": name = f"{name}-{anon_count}" anon_count += 1 job_title_present = review_card.find("div", class_="mt-4th") job_title = job_title_present.text if job_title_present else "n/a" rating_container = review_card.find("div", class_="f-1 d-f ai-c mb-half-small-only") rating_div = rating_container.find("div") rating_class = rating_div.get("class") stars_string = rating_class[-1] stars_large_number = float(stars_string.split("-")[-1]) stars_clean_number = stars_large_number/2 review_body = review_card.find("div", attrs={"itemprop": "reviewBody"}).text info_container = review_card.find("div", class_="tags--teal") incentives_dirty = info_container.find_all("div") incentives_clean = [] source = "" for incentive in incentives_dirty: if incentive.text not in incentives_clean: if "Review source:" in incentive.text: source = incentive.text.split(": ")[-1] else: incentives_clean.append(incentive.text) validated = "Validated Reviewer" in incentives_clean incentivized = "Incentivized Review" in incentives_clean review_data = ReviewData( name=name, date=date, job_title=job_title, rating=stars_clean_number, full_review=review_body, review_source=source, validated=validated, incentivized=incentivized ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['g2_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['g2_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
to crawl 10 pages.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 10 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
PAGES
to 10
and our LOCATION
to "us"
. Here are the results.robots.txt
.You can view G2's terms here and their robots.txt
is available here.Always be careful about the information you extract and don't scrape private or confidential data. If a website is hidden behind a login, that is generally considered private data.If your data does not require a login, it is generally considered to be public data. If you have questions about the legality of your scraping job, it is best to consult an attorney familiar with the laws and localities you're dealing with.find()
method in BeautifulSoup and you should understand some pretty complex string operations for extracting data.If you'd like to learn more about the tools used in this article, take a look at the links below:Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your ScrapeOps API key.This script will perform a search based on any keywords in the keywords_list
and then generate a detailed report on businesses that match that keyword.After generating the report, the scraper reads it and does a detailed search report on each individual business from the original report.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" job_title: str = "" rating: float = 0 full_review: str = "" review_source: str = "" validated: bool = False incentivized: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']") for div_card in div_cards: name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']") g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']") has_rating = len(rating_elements) > 0 rating = 0.0 if has_rating: rating = rating_elements[0].text description = div_card.find_element(By.CSS_SELECTOR, "p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["g2_url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.get(get_scrapeops_url(url, location=location)) try: review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") anon_count = 0 for review_card in review_cards: review_date = review_card.find_elements(By.CSS_SELECTOR, "time") has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0 if len(review_date) > 0 and has_text: date = review_date[0].get_attribute("datetime") name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']") name = name_array[0].text if len(name_array) > 0 else "anonymous" if name == "anonymous": name = f"{name}-{anon_count}" anon_count += 1 job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']") job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a" rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']") rating_div = rating_container.find_element(By.CSS_SELECTOR, "div") rating_class = rating_div.get_attribute("class") stars_string = rating_class[-1] stars_large_number = float(stars_string.split("-")[-1]) stars_clean_number = stars_large_number/2 review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']") incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div") incentives_clean = [] source = "" for incentive in incentives_dirty: if incentive.text not in incentives_clean: if "Review source:" in incentive.text: source = incentive.text.split(": ")[-1] else: incentives_clean.append(incentive.text) validated = "Validated Reviewer" in incentives_clean incentivized = "Incentivized Review" in incentives_clean review_data = ReviewData( name=name, date=date, job_title=job_title, rating=stars_clean_number, full_review=review_body, review_source=source, validated=validated, incentivized=incentivized ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['g2_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['g2_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
keyword_list
: Contains a list of keywords to be searched and scraped.MAX_RETRIES
: Specifies the number of times the scraper will retry fetching a page if it encounters an error.MAX_THREADS
: Defines the maximum number of threads to be used for concurrent scraping.PAGES
: Specifies the number of pages to scrape for each keyword.LOCATION
: Defines the geographic location from which the scraping requests appear to originate.https://www.g2.com/search?query=online+bank
https://www.g2.com/search?
holds the first part of our URL. Our query gets added onto the end: query=online+bank
. We can also add more parameters with &
.Take a look at the search below for online bank.https://www.g2.com/products/name-of-business/reviews
name
of a business nested within the page. All in all the results page isn't too difficult to parse through, we're only going to be taking 4 pieces of data from each result.stars-8
at the end of the class name.8
is actually our 4.0 rating... doubled.stars-10
would be a 5 star rating.stars-9
would be 4.5 stars, stars-8
, is 4 stars... you get the idea.stars-number
is always double the actual rating.page
parameter.Our updated URL should look like this:https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}
https://www.g2.com/products/name-of-business/reviews
country
parameter to "uk"
, if we want to be in the US, we can set this param to "us"
.When we pass our country
into the ScrapeOps API, ScrapeOps will actually route our requests through a server in that country, so even if the site checks our geolocation, our geolocation will show up correctly!mkdir g2-scraper cd g2-scraper
python -m venv venv
source venv/bin/activate
pip install selenium
while
we still have retries
left and the operation hasn't succeeded:
driver.get(url)
fetches the sitename
with div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")
name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
gets the link to the business, g2_url
rating
present on the page, we pull it from the page with rating_elements[0].text
. If there is no rating present, we give it a default of 0.0description = div_card.find_element(By.CSS_SELECTOR, "p").text
gives us the description of the businessimport osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) driver.save_screenshot("test.png") logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']") for div_card in div_cards: name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']") g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']") has_rating = len(rating_elements) > 0 rating = 0.0 if has_rating: rating = rating_elements[0].text description = div_card.find_element(By.CSS_SELECTOR, "p").text search_data = { "name": name.text, "stars": rating, "g2_url": g2_url, "description": description } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
name
, stars
, g2_url
, and description
. We'll use this information to create uniform objects representing each business from the search results.This information is the very foundation for our crawler report.https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}
page_number+1
because start_scrape()
begins counting at zero.Take a look at the updated code below:import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']") for div_card in div_cards: name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']") g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']") has_rating = len(rating_elements) > 0 rating = 0.0 if has_rating: rating = rating_elements[0].text description = div_card.find_element(By.CSS_SELECTOR, "p").text search_data = { "name": name.text, "stars": rating, "g2_url": g2_url, "description": description } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, max_threads=5, retries=3): for page in range(pages): scrape_search_results(keyword, location, page_number, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
page_number
to scrape_search_results()
. We also added a start_scrape()
function which gives us the ability to scrape multiple pages.Later on, we're going to add concurrency to this function. For the moment, we'll to use a simple for
loop as a placeholder.SearchData
and DataPipeline
. They might look a bit scary, but these classes are actually pretty simple.SearchData
is used to represent individual business objects.DataPipeline
takes our SearchData
as input.
DataPipeline
takes in our SearchData
, it compares each object by name
.name
, the second one gets dropped. This approach works really well when removing duplicates.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']") for div_card in div_cards: name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']") g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']") has_rating = len(rating_elements) > 0 rating = 0.0 if has_rating: rating = rating_elements[0].text description = div_card.find_element(By.CSS_SELECTOR, "p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): for page in range(pages): scrape_search_results(keyword, location, page_number, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
DataPipeline
gives us an efficient pipeline to a CSV output fileSearchData
objects become individual rows in our CSV filefor
loop isn't good enough if we want to run our crawler at scale in production.The function below refactors our start_scrape()
function to use ThreadPoolExecutor
and take advantage of the multithreading offered by our CPU.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map()
scrape_search_results
tells the executor to run this function on each of thread[keyword] * pages
passes our keyword
into executor.map()
as a list
list
as wellimport osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']") for div_card in div_cards: name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']") g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']") has_rating = len(rating_elements) > 0 rating = 0.0 if has_rating: rating = rating_elements[0].text description = div_card.find_element(By.CSS_SELECTOR, "p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us" } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
country
of our choice.Each request we make is going to come from a different IP address. Instead of looking like one really weird user, our crawler looks like a random group of normal users.Our code barely changes at all here, but we're now a production ready level. Take a look at the full code example below.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']") for div_card in div_cards: name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']") g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']") has_rating = len(rating_elements) > 0 rating = 0.0 if has_rating: rating = rating_elements[0].text description = div_card.find_element(By.CSS_SELECTOR, "p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
below, we're going to scrape 10 pages.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 10 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
PAGES
now gets set to 10
and LOCATION
gets set to "us"
. Now, we need to process 10 pages of data.Here are the results:def process_business(row, location, retries=3): url = row["g2_url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.get(url, location=location) try: review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']") anon_count = 0 for review_card in review_cards: review_date = review_card.find_elements(By.CSS_SELECTOR, "time") has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0 if len(review_date) > 0 and has_text: date = review_date[0].get_attribute("datetime") name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']") name = name_array[0].text if len(name_array) > 0 else "anonymous" if name == "anonymous": name = f"{name}-{anon_count}" anon_count += 1 job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']") job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a" rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']") rating_div = rating_container.find_element(By.CSS_SELECTOR, "div") rating_class = rating_div.get_attribute("class") stars_string = rating_class[-1] stars_large_number = float(stars_string.split("-")[-1]) stars_clean_number = stars_large_number/2 review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']") incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div") incentives_clean = [] source = "" for incentive in incentives_dirty: if incentive.text not in incentives_clean: if "Review source:" in incentive.text: source = incentive.text.split(": ")[-1] else: incentives_clean.append(incentive.text) validated = "Validated Reviewer" in incentives_clean incentivized = "Incentivized Review" in incentives_clean review_data = { "name": name, "date": date, "job_title": job_title, "rating": stars_clean_number, "full_review": review_body, "review_source": source, "validated": validated, "incentivized": incentivized } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['g2_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['g2_url']}")
date
. From each review, we pull the date
with review_date[0].get_attribute("datetime")
"anonymous"
and give them a number. This prevents different anonymous reviews from getting filtered outif len(job_title_array) > 0 else "n/a"
checks if the job_title
is present. If it is not, we give it a default value of "n/a"
. Otherwise we pull the user's job_title
from the post.rating_div.get_attribute("class")
pulls the CSS class from our rating. We then split("-")
to separate the number of stars from the CSS class. After splitting the stars, we divide them by 2 to get the actual rating.review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text
gives us the actual reviewincentives_dirty
list to hold all of the incentive tags from the review. If "Review source:"
is in the text of the incentive item, we split(": ")
to separate the source name and pull it. All other non duplicate items get pushed into the incentives_clean
list."Validated Reviewer"
or "Incentivized Review"
is inside the incentives_clean
list, we set those variables to True
row
from our CSV file. Then, it fetches the g2_url
for the business. Now that we can extract the correct data from the site, we're ready to read our CSV file and scrape this valuable data.process_business()
, we need to read rows from the CSV file we created earlier. We're going to update our full code to do just that.Take a look at the function below:def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries)
process_business()
. You can view the fully updated code below.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']") for div_card in div_cards: name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']") g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']") has_rating = len(rating_elements) > 0 rating = 0.0 if has_rating: rating = rating_elements[0].text description = div_card.find_element(By.CSS_SELECTOR, "p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["g2_url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.get(url, location=location) try: review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']") anon_count = 0 for review_card in review_cards: review_date = review_card.find_elements(By.CSS_SELECTOR, "time") has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0 if len(review_date) > 0 and has_text: date = review_date[0].get_attribute("datetime") name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']") name = name_array[0].text if len(name_array) > 0 else "anonymous" if name == "anonymous": name = f"{name}-{anon_count}" anon_count += 1 job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']") job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a" rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']") rating_div = rating_container.find_element(By.CSS_SELECTOR, "div") rating_class = rating_div.get_attribute("class") stars_string = rating_class[-1] stars_large_number = float(stars_string.split("-")[-1]) stars_clean_number = stars_large_number/2 review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']") incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div") incentives_clean = [] source = "" for incentive in incentives_dirty: if incentive.text not in incentives_clean: if "Review source:" in incentive.text: source = incentive.text.split(": ")[-1] else: incentives_clean.append(incentive.text) validated = "Validated Reviewer" in incentives_clean incentivized = "Incentivized Review" in incentives_clean review_data = { "name": name, "date": date, "job_title": job_title, "rating": stars_clean_number, "full_review": review_body, "review_source": source, "validated": validated, "incentivized": incentivized } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['g2_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['g2_url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
process_results()
reads rows from our CSV file. It then passes each of these rows into process_business()
. process_business()
extracts our data and then prints it to the terminal.DataPipeline
is already built for this, but we need another @dataclass
. Take a look at the snippet below, it's our ReviewData
.@dataclassclass ReviewData: name: str = "" date: str = "" job_title: str = "" rating: float = 0 full_review: str = "" review_source: str = "" validated: bool = False incentivized: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
ReviewData
uses the following fields to represent reviews on the page:name: str
date: str
job_title: str
rating: float
full_review: str
review_source: str
validated: bool
incentivized: bool
DataPipeline
and pass our ReviewData
object into it.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" job_title: str = "" rating: float = 0 full_review: str = "" review_source: str = "" validated: bool = False incentivized: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']") for div_card in div_cards: name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']") g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']") has_rating = len(rating_elements) > 0 rating = 0.0 if has_rating: rating = rating_elements[0].text description = div_card.find_element(By.CSS_SELECTOR, "p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["g2_url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.get(url, location=location) try: review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") anon_count = 0 for review_card in review_cards: review_date = review_card.find_elements(By.CSS_SELECTOR, "time") has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0 if len(review_date) > 0 and has_text: date = review_date[0].get_attribute("datetime") name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']") name = name_array[0].text if len(name_array) > 0 else "anonymous" if name == "anonymous": name = f"{name}-{anon_count}" anon_count += 1 job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']") job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a" rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']") rating_div = rating_container.find_element(By.CSS_SELECTOR, "div") rating_class = rating_div.get_attribute("class") stars_string = rating_class[-1] stars_large_number = float(stars_string.split("-")[-1]) stars_clean_number = stars_large_number/2 review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']") incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div") incentives_clean = [] source = "" for incentive in incentives_dirty: if incentive.text not in incentives_clean: if "Review source:" in incentive.text: source = incentive.text.split(": ")[-1] else: incentives_clean.append(incentive.text) validated = "Validated Reviewer" in incentives_clean incentivized = "Incentivized Review" in incentives_clean review_data = ReviewData( name=name, date=date, job_title=job_title, rating=stars_clean_number, full_review=review_body, review_source=source, validated=validated, incentivized=incentivized ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['g2_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['g2_url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_business(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
ThreadPoolExecutor
in basically the same way we did earlier. The biggest difference is that we read our CSV file first.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) )
for
loop. The rest of our code remains pretty much the same.get_scrapeops_url()
function so we just need to place it into our script.driver.get(get_scrapeops_url(url, location=location))
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": "us", } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 g2_url: str = "" description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" job_title: str = "" rating: float = 0 full_review: str = "" review_source: str = "" validated: bool = False incentivized: bool = False def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']") for div_card in div_cards: name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']") g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']") has_rating = len(rating_elements) > 0 rating = 0.0 if has_rating: rating = rating_elements[0].text description = div_card.find_element(By.CSS_SELECTOR, "p").text search_data = SearchData( name=name.text, stars=rating, g2_url=g2_url, description=description ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_business(row, location, retries=3): url = row["g2_url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.get(get_scrapeops_url(url, location=location)) try: review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") anon_count = 0 for review_card in review_cards: review_date = review_card.find_elements(By.CSS_SELECTOR, "time") has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0 if len(review_date) > 0 and has_text: date = review_date[0].get_attribute("datetime") name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']") name = name_array[0].text if len(name_array) > 0 else "anonymous" if name == "anonymous": name = f"{name}-{anon_count}" anon_count += 1 job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']") job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a" rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']") rating_div = rating_container.find_element(By.CSS_SELECTOR, "div") rating_class = rating_div.get_attribute("class") stars_string = rating_class[-1] stars_large_number = float(stars_string.split("-")[-1]) stars_clean_number = stars_large_number/2 review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']") incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div") incentives_clean = [] source = "" for incentive in incentives_dirty: if incentive.text not in incentives_clean: if "Review source:" in incentive.text: source = incentive.text.split(": ")[-1] else: incentives_clean.append(incentive.text) validated = "Validated Reviewer" in incentives_clean incentivized = "Incentivized Review" in incentives_clean review_data = ReviewData( name=name, date=date, job_title=job_title, rating=stars_clean_number, full_review=review_body, review_source=source, validated=validated, incentivized=incentivized ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['g2_url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['g2_url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_business, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
to crawl 10 pages.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 10 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["online bank"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
PAGES
to 10
and our LOCATION
to "us"
. Here are the results.robots.txt
.You can view G2's terms here and their robots.txt
is available here.Always be careful about the information you extract and don't scrape private or confidential data. If a website is hidden behind a login, that is generally considered private data.If your data does not require a login, it is generally considered to be public data. If you have questions about the legality of your scraping job, it is best to consult an attorney familiar with the laws and localities you're dealing with.find_element()
and find_elements()
methods from Selenium and you should understand some pretty complex string operations for extracting data.If you'd like to learn more about the tools used in this article, take a look at the links below:Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your ScrapeOps Proxy API key and place it in the same folder as this script.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.g2.com/search?page=${pageNumber+1}&query=${formattedKeyword}`; const proxyUrl = getScrapeOpsUrl(url, location); console.log(proxyUrl) await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='product-listing mb-1 border-bottom']"); for (const divCard of divCards) { const nameElement = await divCard.$("div[class='product-listing__product-name']"); const name = await page.evaluate(element => element.textContent, nameElement); const g2UrlElement = await nameElement.$("a"); const g2Url = await page.evaluate(element => element.getAttribute("href"), g2UrlElement); let rating = 0.0; const ratingElement = await divCard.$("span[class='fw-semibold']"); if (ratingElement) { rating = await page.evaluate(element => element.textContent, ratingElement); } const descriptionElement = await divCard.$("p"); const description = await page.evaluate(element => element.textContent, descriptionElement) const businessInfo = { name: name, stars: rating, g2_url: g2Url, description: description }; await writeToCsv([businessInfo], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.g2_url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(getScrapeOpsUrl(url, location), { timeout: 60000 }); const reviewCards = await page.$$("div[class='paper paper--white paper--box mb-2 position-relative border-bottom']"); let anonCount = 0; for (const reviewCard of reviewCards) { reviewDateElement = await reviewCard.$("time"); reviewTextElement = await reviewCard.$("div[itemprop='reviewBody']"); if (reviewDateElement && reviewTextElement) { const date = await page.evaluate(element => element.getAttribute("datetime"), reviewDateElement); const reviewBody = await page.evaluate(element => element.textContent, reviewTextElement); const nameElement = await reviewCard.$("a[class='link--header-color']"); let name; if (nameElement) { name = await page.evaluate(element => element.textContent, nameElement); } else { name = `anonymous-${anonCount}`; anonCount++; } const jobTitleElement = await reviewCard.$("div[class='mt-4th']"); let jobTitle; if (jobTitleElement) { jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); } else { jobTitle = "n/a"; } const ratingContainer = await reviewCard.$("div[class='f-1 d-f ai-c mb-half-small-only']"); const ratingDiv = await ratingContainer.$("div"); const ratingClass = await page.evaluate(element => element.getAttribute("class"), ratingDiv); const ratingArray = ratingClass.split("-"); const rating = Number(ratingArray[ratingArray.length-1])/2; const infoContainer = await reviewCard.$("div[class='tags--teal']"); const incentivesDirty = await infoContainer.$$("div"); const incentivesClean = []; let source = ""; for (const incentive of incentivesDirty) { const text = await page.evaluate(element => element.textContent, incentive); if (!incentivesClean.includes(text)) { if (text.includes("Review source:")) { textArray = text.split(": "); source = textArray[textArray.length-1]; } else { incentivesClean.push(text); } } } const validated = incentivesClean.includes("Validated Reviewer"); const incentivized = incentivesClean.includes("Incentivized Review"); const reviewData = { name: name, date: date, job_title: jobTitle, rating: rating, full_review: reviewBody, review_source: source, validated: validated, incentivized: incentivized } await writeToCsv([reviewData], `${row.name.replace(" ", "-")}.csv`); } } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); tries++; } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); while (businesses.length > 0) { const currentBatch = businesses.splice(0, concurrencyLimit); const tasks = currentBatch.map(business => processBusiness(browser, business, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); } async function main() { const keywords = ["online bank"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log("Scrape complete");} main();
main
function as well:keywords
: Contains a list of keywords to be searched and scraped.retries
: Specifies the number of times the scraper will retry fetching a page if it encounters an error.concurrencyLimit
: Defines the maximum number of threads to be used for concurrent scraping.pages
: Specifies the number of pages to scrape for each keyword.location
: Defines the geographic location from which the scraping requests appear to originate.https://www.g2.com/search?query=online+bank`
https://www.g2.com/search?
holds the actual domain of our URL. The query is on the end: query=online+bank
. We can also add more parameters with &
.Take a look at the search below for online bank.https://www.g2.com/products/name-of-business/reviews
name
of a business nested within the page.The results page isn't too difficult to parse, and we're only going to be taking 4 pieces of data from each result.stars-8
at the end of the class name.stars-number
is actually double the number of our rating. stars-10
would be a 5 star review.stars-9
would be 4.5. stars-8
would be 4. You get the idea.stars-number
by two and you get your rating.https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}
https://www.g2.com/products/name-of-business/reviews
country
parameter to "uk"
, if we want to be in the US, we can set this param to "us"
.When we pass our country
into the ScrapeOps API, ScrapeOps will actually route our requests through a server in that country, so even if the site checks our geolocation, our geolocation will show up correctly!mkdir g2-scraper cd g2-scraper
npm init --y
npm install puppeteer
npm install csv-writer
npm install csv-parse
npm install fs
while
we still have retries
left and the operation hasn't succeeded:
await page.goto(url)
fetches the sitename
with await page.evaluate(element => element.textContent, nameElement)
await page.evaluate(element => element.getAttribute("href"), g2UrlElement)
gets the link to the business, g2_url
rating
present on the page, we pull it from the page. If there is no rating present, we set a default of 0.0await page.evaluate(element => element.textContent, descriptionElement)
gives us the description of the businessbusinessInfo
to the consoleconst puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.g2.com/search?query=${formattedKeyword}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='product-listing mb-1 border-bottom']"); for (const divCard of divCards) { const nameElement = await divCard.$("div[class='product-listing__product-name']"); const name = await page.evaluate(element => element.textContent, nameElement); const g2UrlElement = await nameElement.$("a"); const g2Url = await page.evaluate(element => element.getAttribute("href"), g2UrlElement); let rating = 0.0; const ratingElement = await divCard.$("span[class='fw-semibold']"); if (ratingElement) { rating = await page.evaluate(element => element.textContent, ratingElement); } const descriptionElement = await divCard.$("p"); const description = await page.evaluate(element => element.textContent, descriptionElement); const businessInfo = { name: name, stars: rating, g2_url: g2Url, description: description }; console.log(businessInfo); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, location, retries) { const browser = await puppeteer.launch() await scrapeSearchResults(browser, keyword, location, retries) await browser.close();} async function main() { const keywords = ["online bank"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, location, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }} main();
name
, stars
, g2_url
, and description
. This data allows us to create objects that represent each business.Everything we do from here depends on the data we pull with this parsing function.https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}
page_number+1
because startScrape()
begins counting at zero.Take a look at the updated code below:const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.g2.com/search?page=${pageNumber+1}&query=${formattedKeyword}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='product-listing mb-1 border-bottom']"); for (const divCard of divCards) { const nameElement = await divCard.$("div[class='product-listing__product-name']"); const name = await page.evaluate(element => element.textContent, nameElement); const g2UrlElement = await nameElement.$("a"); const g2Url = await page.evaluate(element => element.getAttribute("href"), g2UrlElement); let rating = 0.0; const ratingElement = await divCard.$("span[class='fw-semibold']"); if (ratingElement) { rating = await page.evaluate(element => element.textContent, ratingElement); } const descriptionElement = await divCard.$("p"); const description = await page.evaluate(element => element.textContent, descriptionElement) const businessInfo = { name: name, stars: rating, g2_url: g2Url, description: description }; console.log(businessInfo); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, location, retries); } await browser.close();} async function main() { const keywords = ["online bank"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }} main();
pageNumber
to scrapeSearchResults()
. We also added some functionality to startScrape()
.It now creates a list of pages to scrape. It then iterates through the list and runs scrapeSearchResults()
on each of the pages from the list.writeToCsv()
function. You can look at it in the snippet below:async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }}
outputFile
. If outputFile
already exists, we open it in append mode so we don't overwrite any important data. If the file doesn't exist yet, this function will create it.Here is the full code after it's been updated to write our information to a CSV.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.g2.com/search?page=${pageNumber+1}&query=${formattedKeyword}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='product-listing mb-1 border-bottom']"); for (const divCard of divCards) { const nameElement = await divCard.$("div[class='product-listing__product-name']"); const name = await page.evaluate(element => element.textContent, nameElement); const g2UrlElement = await nameElement.$("a"); const g2Url = await page.evaluate(element => element.getAttribute("href"), g2UrlElement); let rating = 0.0; const ratingElement = await divCard.$("span[class='fw-semibold']"); if (ratingElement) { rating = await page.evaluate(element => element.textContent, ratingElement); } const descriptionElement = await divCard.$("p"); const description = await page.evaluate(element => element.textContent, descriptionElement) const businessInfo = { name: name, stars: rating, g2_url: g2Url, description: description }; await writeToCsv([businessInfo], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, location, retries); } await browser.close();} async function main() { const keywords = ["online bank"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }} main();
writeToCsv()
takes an array of objects and writes them to a CSV filefor
loop isn't good enough for a production level crawler. We refactored startScrape()
to use a concurrencyLimit
and take advantage of the multiple pages inside the browser.async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();}
while
pageList.length
is greater than zero, we splice()
a batch from index zero up to our concurrencyLimit
scrapeSearchResults()
on each page in the batch simultaneously and then await
the results of the batchpageList
shrinks all the way down to zero. Each time a batch is processed, this list gets smaller and frees up more memory. Theoretically, the longer this function runs, the faster it gets.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.g2.com/search?page=${pageNumber+1}&query=${formattedKeyword}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='product-listing mb-1 border-bottom']"); for (const divCard of divCards) { const nameElement = await divCard.$("div[class='product-listing__product-name']"); const name = await page.evaluate(element => element.textContent, nameElement); const g2UrlElement = await nameElement.$("a"); const g2Url = await page.evaluate(element => element.getAttribute("href"), g2UrlElement); let rating = 0.0; const ratingElement = await divCard.$("span[class='fw-semibold']"); if (ratingElement) { rating = await page.evaluate(element => element.textContent, ratingElement); } const descriptionElement = await divCard.$("p"); const description = await page.evaluate(element => element.textContent, descriptionElement) const businessInfo = { name: name, stars: rating, g2_url: g2Url, description: description }; await writeToCsv([businessInfo], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ["online bank"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }} main();
function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}
country
we choose. Each time we do something, that request comes from a different IP address!We don't appear like one really bizarre fast user, our crawler looks like a bunch of normal users.Our code barely changes at all here, but we're now a production ready level. Take a look at the full code example below.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.g2.com/search?page=${pageNumber+1}&query=${formattedKeyword}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='product-listing mb-1 border-bottom']"); for (const divCard of divCards) { const nameElement = await divCard.$("div[class='product-listing__product-name']"); const name = await page.evaluate(element => element.textContent, nameElement); const g2UrlElement = await nameElement.$("a"); const g2Url = await page.evaluate(element => element.getAttribute("href"), g2UrlElement); let rating = 0.0; const ratingElement = await divCard.$("span[class='fw-semibold']"); if (ratingElement) { rating = await page.evaluate(element => element.textContent, ratingElement); } const descriptionElement = await divCard.$("p"); const description = await page.evaluate(element => element.textContent, descriptionElement) const businessInfo = { name: name, stars: rating, g2_url: g2Url, description: description }; await writeToCsv([businessInfo], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ["online bank"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }} main();
main
below.async function main() { const keywords = ["online bank"]; const concurrencyLimit = 5; const pages = 10; const location = "us"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }}
pages
is set to 10
, location
gets set to "us"
, and concurrencyLimit
is set to 5. Now, we need to process 10 pages of data.Here are the results:async function processBusiness(browser, row, location, retries = 3) { const url = row.g2_url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url); const reviewCards = await page.$$("div[class='paper paper--white paper--box mb-2 position-relative border-bottom']"); let anonCount = 0; for (const reviewCard of reviewCards) { reviewDateElement = await reviewCard.$("time"); reviewTextElement = await reviewCard.$("div[itemprop='reviewBody']"); if (reviewDateElement && reviewTextElement) { const date = await page.evaluate(element => element.getAttribute("datetime"), reviewDateElement); const reviewBody = await page.evaluate(element => element.textContent, reviewTextElement); const nameElement = await reviewCard.$("a[class='link--header-color']"); let name; if (nameElement) { name = await page.evaluate(element => element.textContent, nameElement); } else { name = `anonymous-${anonCount}`; anonCount++; } const jobTitleElement = await reviewCard.$("div[class='mt-4th']"); let jobTitle; if (jobTitleElement) { jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); } else { jobTitle = "n/a"; } const ratingContainer = await reviewCard.$("div[class='f-1 d-f ai-c mb-half-small-only']"); const ratingDiv = await ratingContainer.$("div"); const ratingClass = await page.evaluate(element => element.getAttribute("class"), ratingDiv); const ratingArray = ratingClass.split("-"); const rating = Number(ratingArray[ratingArray.length-1])/2; const infoContainer = await reviewCard.$("div[class='tags--teal']"); const incentivesDirty = await infoContainer.$$("div"); const incentivesClean = []; let source = ""; for (const incentive of incentivesDirty) { const text = await page.evaluate(element => element.textContent, incentive); if (!incentivesClean.includes(text)) { if (text.includes("Review source:")) { textArray = text.split(": "); source = textArray[textArray.length-1]; } else { incentivesClean.push(text); } } } const validated = incentivesClean.includes("Validated Reviewer"); const incentivized = incentivesClean.includes("Incentivized Review"); const reviewData = { name: name, date: date, job_title: jobTitle, rating: rating, full_review: reviewBody, review_source: source, validated: validated, incentivized: incentivized } console.log(reviewData); } } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}`); tries++; } finally { await page.close(); } } }
date
. From each review, we pull the date
with await page.evaluate(element => element.getAttribute("datetime"), reviewDateElement);
"anonymous"
and give them a number. This prevents different anonymous reviews from getting filtered outawait reviewCard.$("div[class='mt-4th']")
checks if the job_title
is present. If it is not, we give it a default value of "n/a"
. Otherwise we pull the user's job_title
from the post.await page.evaluate(element => element.getAttribute("class"), ratingDiv)
pulls the CSS class from our rating. We then split("-")
to separate the number of stars from the CSS class. After splitting the stars, we divide them by 2 to get the actual rating.await page.evaluate(element => element.textContent, reviewTextElement)
gives us the actual reviewincentives_dirty
list to hold all of the incentive tags from the review. If "Review source:"
is in the text of the incentive item, we split(": ")
to separate the source name and pull it. All other non duplicate items get pushed into the incentives_clean
list."Validated Reviewer"
or "Incentivized Review"
is inside the incentives_clean
list, we set those variables to True
row
from our CSV file. It then gets the g2_url
for the business using page.goto()
. Now that we can get the right data from our site, we're ready to read our CSV file and pull all this important data.processBusiness()
, we need to read the CSV that the crawler creates. Let's update our code so our new function, processResults()
can handle this.Take a look at the new function below:async function processResults(csvFile, location, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const business of businesses) { await processBusiness(browser, business, location, retries); } await browser.close(); }
readCsv()
function as well. It takes in a CSV file and spits out an array of JSON objects.async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;}
processResults()
reads the CSV file and then converts all the rows into an array. We then iterate through this array and pass each row into processBusiness()
. You can view the updated code below.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.g2.com/search?page=${pageNumber+1}&query=${formattedKeyword}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='product-listing mb-1 border-bottom']"); for (const divCard of divCards) { const nameElement = await divCard.$("div[class='product-listing__product-name']"); const name = await page.evaluate(element => element.textContent, nameElement); const g2UrlElement = await nameElement.$("a"); const g2Url = await page.evaluate(element => element.getAttribute("href"), g2UrlElement); let rating = 0.0; const ratingElement = await divCard.$("span[class='fw-semibold']"); if (ratingElement) { rating = await page.evaluate(element => element.textContent, ratingElement); } const descriptionElement = await divCard.$("p"); const description = await page.evaluate(element => element.textContent, descriptionElement) const businessInfo = { name: name, stars: rating, g2_url: g2Url, description: description }; await writeToCsv([businessInfo], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.g2_url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url); const reviewCards = await page.$$("div[class='paper paper--white paper--box mb-2 position-relative border-bottom']"); let anonCount = 0; for (const reviewCard of reviewCards) { reviewDateElement = await reviewCard.$("time"); reviewTextElement = await reviewCard.$("div[itemprop='reviewBody']"); if (reviewDateElement && reviewTextElement) { const date = await page.evaluate(element => element.getAttribute("datetime"), reviewDateElement); const reviewBody = await page.evaluate(element => element.textContent, reviewTextElement); const nameElement = await reviewCard.$("a[class='link--header-color']"); let name; if (nameElement) { name = await page.evaluate(element => element.textContent, nameElement); } else { name = `anonymous-${anonCount}`; anonCount++; } const jobTitleElement = await reviewCard.$("div[class='mt-4th']"); let jobTitle; if (jobTitleElement) { jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); } else { jobTitle = "n/a"; } const ratingContainer = await reviewCard.$("div[class='f-1 d-f ai-c mb-half-small-only']"); const ratingDiv = await ratingContainer.$("div"); const ratingClass = await page.evaluate(element => element.getAttribute("class"), ratingDiv); const ratingArray = ratingClass.split("-"); const rating = Number(ratingArray[ratingArray.length-1])/2; const infoContainer = await reviewCard.$("div[class='tags--teal']"); const incentivesDirty = await infoContainer.$$("div"); const incentivesClean = []; let source = ""; for (const incentive of incentivesDirty) { const text = await page.evaluate(element => element.textContent, incentive); if (!incentivesClean.includes(text)) { if (text.includes("Review source:")) { textArray = text.split(": "); source = textArray[textArray.length-1]; } else { incentivesClean.push(text); } } } const validated = incentivesClean.includes("Validated Reviewer"); const incentivized = incentivesClean.includes("Incentivized Review"); const reviewData = { name: name, date: date, job_title: jobTitle, rating: rating, full_review: reviewBody, review_source: source, validated: validated, incentivized: incentivized } console.log(reviewData); } } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}`); tries++; } finally { await page.close(); } } } async function processResults(csvFile, location, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const business of businesses) { await processBusiness(browser, business, location, retries); } await browser.close(); } async function main() { const keywords = ["online bank"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); }} main();
processResults()
reads our CSV file. Then, it passes each row into processBusiness()
. processBusiness()
extracts our data and then prints it to the console.console.log()
and replacing it with the following line:await writeToCsv([reviewData], `${row.name.replace(" ", "-")}.csv`);
reviewData
object uses the following fields to represent reviews from the webpage:name
date
job_title
rating
full_review
review_source
validated
incentivized
reviewData
into writeToCsv()
. Just like before, we pass each object in as soon as it's been processed so we can save as much data as possible in the event of a crash.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.g2.com/search?page=${pageNumber+1}&query=${formattedKeyword}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='product-listing mb-1 border-bottom']"); for (const divCard of divCards) { const nameElement = await divCard.$("div[class='product-listing__product-name']"); const name = await page.evaluate(element => element.textContent, nameElement); const g2UrlElement = await nameElement.$("a"); const g2Url = await page.evaluate(element => element.getAttribute("href"), g2UrlElement); let rating = 0.0; const ratingElement = await divCard.$("span[class='fw-semibold']"); if (ratingElement) { rating = await page.evaluate(element => element.textContent, ratingElement); } const descriptionElement = await divCard.$("p"); const description = await page.evaluate(element => element.textContent, descriptionElement) const businessInfo = { name: name, stars: rating, g2_url: g2Url, description: description }; await writeToCsv([businessInfo], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.g2_url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url); const reviewCards = await page.$$("div[class='paper paper--white paper--box mb-2 position-relative border-bottom']"); let anonCount = 0; for (const reviewCard of reviewCards) { reviewDateElement = await reviewCard.$("time"); reviewTextElement = await reviewCard.$("div[itemprop='reviewBody']"); if (reviewDateElement && reviewTextElement) { const date = await page.evaluate(element => element.getAttribute("datetime"), reviewDateElement); const reviewBody = await page.evaluate(element => element.textContent, reviewTextElement); const nameElement = await reviewCard.$("a[class='link--header-color']"); let name; if (nameElement) { name = await page.evaluate(element => element.textContent, nameElement); } else { name = `anonymous-${anonCount}`; anonCount++; } const jobTitleElement = await reviewCard.$("div[class='mt-4th']"); let jobTitle; if (jobTitleElement) { jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); } else { jobTitle = "n/a"; } const ratingContainer = await reviewCard.$("div[class='f-1 d-f ai-c mb-half-small-only']"); const ratingDiv = await ratingContainer.$("div"); const ratingClass = await page.evaluate(element => element.getAttribute("class"), ratingDiv); const ratingArray = ratingClass.split("-"); const rating = Number(ratingArray[ratingArray.length-1])/2; const infoContainer = await reviewCard.$("div[class='tags--teal']"); const incentivesDirty = await infoContainer.$$("div"); const incentivesClean = []; let source = ""; for (const incentive of incentivesDirty) { const text = await page.evaluate(element => element.textContent, incentive); if (!incentivesClean.includes(text)) { if (text.includes("Review source:")) { textArray = text.split(": "); source = textArray[textArray.length-1]; } else { incentivesClean.push(text); } } } const validated = incentivesClean.includes("Validated Reviewer"); const incentivized = incentivesClean.includes("Incentivized Review"); const reviewData = { name: name, date: date, job_title: jobTitle, rating: rating, full_review: reviewBody, review_source: source, validated: validated, incentivized: incentivized } await writeToCsv([reviewData], `${row.name.replace(" ", "-")}.csv`); } } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}`); tries++; } finally { await page.close(); } } } async function processResults(csvFile, location, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const business of businesses) { await processBusiness(browser, business, location, retries); } await browser.close(); } async function main() { const keywords = ["online bank"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); }} main();
concurrencyLimit
to open many pages simultaneously. The largest difference here is our array. Instead of an array of page numbers, we have a much larger array of CSV rows.async function processResults(csvFile, location, concurrencyLimit, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); while (businesses.length > 0) { const currentBatch = businesses.splice(0, concurrencyLimit); const tasks = currentBatch.map(business => processBusiness(browser, business, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); }
for
and replaced it with cuncurrent batches that use async
and await
. Aside from the changes here, the rest of our code remains basically the same!getScrapeOpsUrl()
function. We just need to change one line.await page.goto(getScrapeOpsUrl(url, location));
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; console.log("api key:", API_KEY); async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.g2.com/search?page=${pageNumber+1}&query=${formattedKeyword}`; const proxyUrl = getScrapeOpsUrl(url, location); console.log(proxyUrl) await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='product-listing mb-1 border-bottom']"); for (const divCard of divCards) { const nameElement = await divCard.$("div[class='product-listing__product-name']"); const name = await page.evaluate(element => element.textContent, nameElement); const g2UrlElement = await nameElement.$("a"); const g2Url = await page.evaluate(element => element.getAttribute("href"), g2UrlElement); let rating = 0.0; const ratingElement = await divCard.$("span[class='fw-semibold']"); if (ratingElement) { rating = await page.evaluate(element => element.textContent, ratingElement); } const descriptionElement = await divCard.$("p"); const description = await page.evaluate(element => element.textContent, descriptionElement) const businessInfo = { name: name, stars: rating, g2_url: g2Url, description: description }; await writeToCsv([businessInfo], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, pages, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch() while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processBusiness(browser, row, location, retries = 3) { const url = row.g2_url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(getScrapeOpsUrl(url, location), { timeout: 60000 }); const reviewCards = await page.$$("div[class='paper paper--white paper--box mb-2 position-relative border-bottom']"); let anonCount = 0; for (const reviewCard of reviewCards) { reviewDateElement = await reviewCard.$("time"); reviewTextElement = await reviewCard.$("div[itemprop='reviewBody']"); if (reviewDateElement && reviewTextElement) { const date = await page.evaluate(element => element.getAttribute("datetime"), reviewDateElement); const reviewBody = await page.evaluate(element => element.textContent, reviewTextElement); const nameElement = await reviewCard.$("a[class='link--header-color']"); let name; if (nameElement) { name = await page.evaluate(element => element.textContent, nameElement); } else { name = `anonymous-${anonCount}`; anonCount++; } const jobTitleElement = await reviewCard.$("div[class='mt-4th']"); let jobTitle; if (jobTitleElement) { jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); } else { jobTitle = "n/a"; } const ratingContainer = await reviewCard.$("div[class='f-1 d-f ai-c mb-half-small-only']"); const ratingDiv = await ratingContainer.$("div"); const ratingClass = await page.evaluate(element => element.getAttribute("class"), ratingDiv); const ratingArray = ratingClass.split("-"); const rating = Number(ratingArray[ratingArray.length-1])/2; const infoContainer = await reviewCard.$("div[class='tags--teal']"); const incentivesDirty = await infoContainer.$$("div"); const incentivesClean = []; let source = ""; for (const incentive of incentivesDirty) { const text = await page.evaluate(element => element.textContent, incentive); if (!incentivesClean.includes(text)) { if (text.includes("Review source:")) { textArray = text.split(": "); source = textArray[textArray.length-1]; } else { incentivesClean.push(text); } } } const validated = incentivesClean.includes("Validated Reviewer"); const incentivized = incentivesClean.includes("Incentivized Review"); const reviewData = { name: name, date: date, job_title: jobTitle, rating: rating, full_review: reviewBody, review_source: source, validated: validated, incentivized: incentivized } await writeToCsv([reviewData], `${row.name.replace(" ", "-")}.csv`); } } success = true; } catch (err) { console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); tries++; } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const businesses = await readCsv(csvFile); const browser = await puppeteer.launch(); while (businesses.length > 0) { const currentBatch = businesses.splice(0, concurrencyLimit); const tasks = currentBatch.map(business => processBusiness(browser, business, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); } async function main() { const keywords = ["online bank"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log("Scrape complete");} main();
async function main() { const keywords = ["online bank"]; const concurrencyLimit = 5; const pages = 10; const location = "us"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); await startScrape(keyword, pages, location, concurrencyLimit, retries); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); }}
pages
to 10
, our location
to "us"
, and our concurrencyLimit
to 5. Here are the results.robots.txt
.You can view G2's terms here and their robots.txt
is available here.Always be careful about the information you extract and don't scrape private or confidential data. If a website is hidden behind a login, that is generally considered private data.If your data does not require a login, it is generally considered to be public data. If you have questions about the legality of your scraping job, it is best to consult an attorney familiar with the laws and localities you're dealing with.page.$()
, page.$$()
and page.evaluate()
methods from Puppeteer and you should understand some pretty complex string operations for extracting data such as split()
, replace()
and includes()
.To learn more about the tools we used in this article, check out the links below: