Skip to main content

How to Scrape G2 with Selenium

At G2, we get the ability to view in-depth and verified reviews for tons of different businesses. If you're looking for a decent review site, G2 is one of the best. We get a treasure trove of information with all sorts of details about the reviews and reviewers.

By following along with this article, you'll be able to retrieve all sorts of data from G2 and you'll learn how to do the following when building scrapers in the future.


TLDR - How to Scrape G2

When we need to scrape G2, all the information we want comes deeply nested within the HTML elements on the page. Extracting this information is difficult. Luckily, if you want a ready-to-go G2 scraper, we've got one for you right here. All you need to do is create a config.json file with your ScrapeOps API key.

This script will perform a search based on any keywords in the keywords_list and then generate a detailed report on businesses that match that keyword.

After generating the report, the scraper reads it and does a detailed search report on each individual business from the original report.

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict

OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")

## Extract Data


div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")


for div_card in div_cards:

name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")

g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")

rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0

if has_rating:
rating = rating_elements[0].text

description = div_card.find_element(By.CSS_SELECTOR, "p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False

while tries <= retries and not success:

driver = webdriver.Chrome(options=OPTIONS)
driver.get(get_scrapeops_url(url, location=location))

try:
review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']")


review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find_elements(By.CSS_SELECTOR, "time")
has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0
if len(review_date) > 0 and has_text:
date = review_date[0].get_attribute("datetime")
name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']")
name = name_array[0].text if len(name_array) > 0 else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1


job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']")
job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a"

rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']")
rating_div = rating_container.find_element(By.CSS_SELECTOR, "div")

rating_class = rating_div.get_attribute("class")

stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2

review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text

info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']")
incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean


review_data = ReviewData(
name=name,
date=date,
job_title=job_title,
rating=stars_clean_number,
full_review=review_body,
review_source=source,
validated=validated,
incentivized=incentivized
)

review_pipeline.add_data(review_data)


review_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1

finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

If you'd like to tweak this scraper, feel free to change any of the following below:

  • keyword_list: Contains a list of keywords to be searched and scraped.
  • MAX_RETRIES: Specifies the number of times the scraper will retry fetching a page if it encounters an error.
  • MAX_THREADS: Defines the maximum number of threads to be used for concurrent scraping.
  • PAGES: Specifies the number of pages to scrape for each keyword.
  • LOCATION: Defines the geographic location from which the scraping requests appear to originate.

How To How To Architect Our G2 Scraper

To scrape G2, we are actually going to build two different scrapers.

  1. Our first one will be the crawler. The crawler performs a search and parses the results. It then takes each business from these results and saves it to a CSV file.
  2. After the crawler, we build our scraper. The purpose of the scraper is to read the CSV file. After it reads the CSV file, the scraper will go through and scrape detailed reviews of every business collected during the crawl.

The crawler generates a detailed list of businesses. The scraper then gets detailed reviews for each business.

For the best performance and stability, each of these scrapers will need the following:

  • Parsing: so we can pull proper information from a page.
  • Pagination: so we can pull up different pages be more selective about our data.
  • Data Storage: to store our data in a safe, efficient and readable way.
  • Concurrency: to scrape multiple pages at once.
  • Proxy Integration: when scraping anything at scale, we often face the issue of getting blocked. Proxies allow us a redundant connection and reduce our likelihood of getting blocked by different websites.

Understanding How To Scrape G2

Step 1: How To Request G2 Pages

Here is a standrd G2 URL:

https://www.g2.com/search?query=online+bank

https://www.g2.com/search? holds the first part of our URL. Our query gets added onto the end: query=online+bank. We can also add more parameters with &.

Take a look at the search below for online bank.

G2 Search Results

Once we've got our search results, we need to create a report about each business from those results. Each business has its own page on G2. The URL for each business is typically constructed like this:

https://www.g2.com/products/name-of-business/reviews

Below is a screenshot of one of G2's individual business pages.

G2 Business Details Page


Step 2: How To Extract Data From G2 Results and Pages

Our G2 data is very deeply nested within the page. Below is a screenshot of the name of a business nested within the page. All in all the results page isn't too difficult to parse through, we're only going to be taking 4 pieces of data from each result.

g2 HTML Inspection

Extracting data from the individual business pages is much more difficult. Take a look below:

g2 HTML Inspection Business Page

Pay close attention here.

  • Specifically, look at stars-8 at the end of the class name.
  • The rating of the review is actually hidden within our CSS class. The 8 is actually our 4.0 rating... doubled.
  • stars-10 would be a 5 star rating.
  • stars-9 would be 4.5 stars, stars-8, is 4 stars... you get the idea.
  • The stars-number is always double the actual rating.

Step 3: How To Control Pagination

Now, we need pagination. to paginate our results, we need to fetch them in uniform sized batches.

If we want all the results from page 1, we fetch page 1. If we want page 2, we fetch page 2. We repeat this process until we've got our desired data. In order to accomplish this, we need to add the page parameter.

Our updated URL should look like this:

https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}

As mentioned earlier, our url for an individual business gets constructed in this format:

https://www.g2.com/products/name-of-business/reviews

Once we're building our URLs properly, it's almost time to start fetching and parsing our data.


Step 4: Geolocated Data

To handle Geoloacated Data, we'll be using the ScrapeOps Proxy API. If we want to be in Great Britain, we simply set our country parameter to "uk", if we want to be in the US, we can set this param to "us".

When we pass our country into the ScrapeOps API, ScrapeOps will actually route our requests through a server in that country, so even if the site checks our geolocation, our geolocation will show up correctly!


Setting Up Our G2 Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir g2-scraper

cd g2-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install selenium

Make sure you have webdriver installed! If you don't, you can check here


Build A G2 Search Crawler

Step 1: Create Simple Search Data Parser

In order to pull the data from a search page, we need to parse it. We're going to get setup and write some code that does exactly this.

In the code below, we do the following:

  • while we still have retries left and the operation hasn't succeeded:
    • driver.get(url) fetches the site
    • We then pull the name with div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")
    • name.find_element(By.CSS_SELECTOR, "a").get_attribute("href") gets the link to the business, g2_url
    • If there is a rating present on the page, we pull it from the page with rating_elements[0].text. If there is no rating present, we give it a default of 0.0
    • description = div_card.find_element(By.CSS_SELECTOR, "p").text gives us the description of the business
    • Finally, we print all of this information to the terminal
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict

OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(url)
driver.save_screenshot("test.png")
logger.info(f"Fetched {url}")

## Extract Data


div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")


for div_card in div_cards:

name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")

g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")

rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0

if has_rating:
rating = rating_elements[0].text

description = div_card.find_element(By.CSS_SELECTOR, "p").text

search_data = {
"name": name.text,
"stars": rating,
"g2_url": g2_url,
"description": description
}
print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")

The code above finds and prints all the basic data for each business: name, stars, g2_url, and description. We'll use this information to create uniform objects representing each business from the search results.

This information is the very foundation for our crawler report.


Step 2: Add Pagination

It's almost time to store our data, but before we do this, we need pagination. As you probably remember, we can paginate our results by the url. Our new URL will look like this:

https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}

We use page_number+1 because start_scrape() begins counting at zero.

Take a look at the updated code below:

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict

OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(url)
logger.info(f"Fetched {url}")

## Extract Data


div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")


for div_card in div_cards:

name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")

g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")

rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0

if has_rating:
rating = rating_elements[0].text

description = div_card.find_element(By.CSS_SELECTOR, "p").text

search_data = {
"name": name.text,
"stars": rating,
"g2_url": g2_url,
"description": description
}
print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page_number, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")

In the example above, we added page_number to scrape_search_results(). We also added a start_scrape() function which gives us the ability to scrape multiple pages.

Later on, we're going to add concurrency to this function. For the moment, we'll to use a simple for loop as a placeholder.


Step 3: Storing the Scraped Data

For data storage, we need two classes: SearchData and DataPipeline. They might look a bit scary, but these classes are actually pretty simple.

  • SearchData is used to represent individual business objects.
  • The DataPipeline takes our SearchData as input.
    • Once the DataPipeline takes in our SearchData, it compares each object by name.
    • When two objects have the same name, the second one gets dropped. This approach works really well when removing duplicates.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict

OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(url)
logger.info(f"Fetched {url}")

## Extract Data


div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")


for div_card in div_cards:

name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")

g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")

rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0

if has_rating:
rating = rating_elements[0].text

description = div_card.find_element(By.CSS_SELECTOR, "p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page_number, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • DataPipeline gives us an efficient pipeline to a CSV output file
  • SearchData objects become individual rows in our CSV file

Step 4: Adding Concurrency

A for loop isn't good enough if we want to run our crawler at scale in production.

The function below refactors our start_scrape() function to use ThreadPoolExecutor and take advantage of the multithreading offered by our CPU.

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

Let's break down the arguments to executor.map()

  • scrape_search_results tells the executor to run this function on each of thread
  • [keyword] * pages passes our keyword into executor.map() as a list
  • All of our other arugments get passed in as a list as well

Here is the fully updated code.

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict

OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(url)
logger.info(f"Fetched {url}")

## Extract Data


div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")


for div_card in div_cards:

name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")

g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")

rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0

if has_rating:
rating = rating_elements[0].text

description = div_card.find_element(By.CSS_SELECTOR, "p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

We can now crawl concurrently, all of our data will now come much faster.


Step 5: Bypassing Anti-Bots

Anti-bots are special software designed to detect and block malicious bots. They protect against things such as DDOS attacks and other things like that. Our crawler isn't malicious, but it looks drastically different from a normal user. It makes dozens of requests in under a second. There is nothing human about that. In order to get past anti-bot software, we need the ScrapeOps API.

The function below uses simple string formatting and converts any regular url into a proxied one using the ScrapeOps Proxy API.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us"
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

The ScrapeOps Proxy API rotates IP addresses and always gives us a server located in a country of our choice.

Each request we make is going to come from a different IP address. Instead of looking like one really weird user, our crawler looks like a random group of normal users.

Our code barely changes at all here, but we're now a production ready level. Take a look at the full code example below.

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict

OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")

## Extract Data


div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")


for div_card in div_cards:

name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")

g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")

rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0

if has_rating:
rating = rating_elements[0].text

description = div_card.find_element(By.CSS_SELECTOR, "p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Now, we'll run this crawler in production. Take a look at the main below, we're going to scrape 10 pages.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 10
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

PAGES now gets set to 10 and LOCATION gets set to "us". Now, we need to process 10 pages of data.

Here are the results:

Crawler Results

All in all, it took roughly 30 seconds to process 10 pages of results...about 3 seconds per page.


Build A G2 Scraper

Our crawler builds reports based on our search criteria. Now that we're outputting a list of businesses, we need to extract detailed information about each of them. We can achieve this by building a scraper for these businesses.

Our scraper will do the following:

  1. Open the report we created
  2. Get the pages from that report
  3. Pull information from these pages
  4. Create an individual report for each of the businesses we've looked up

As we build the review scraper for this project, we'll once again use the following: parsing, storage, concurrency, and proxy integration.


Step 1: Create Simple Business Data Parser

Let's write a simple parser that reads a row of our CSV file and processes the business from that row. Take a look below.

def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False

while tries <= retries and not success:

driver = webdriver.Chrome(options=OPTIONS)
driver.get(url, location=location)

try:
review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']")


anon_count = 0
for review_card in review_cards:
review_date = review_card.find_elements(By.CSS_SELECTOR, "time")
has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0
if len(review_date) > 0 and has_text:
date = review_date[0].get_attribute("datetime")
name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']")
name = name_array[0].text if len(name_array) > 0 else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1


job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']")
job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a"

rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']")
rating_div = rating_container.find_element(By.CSS_SELECTOR, "div")

rating_class = rating_div.get_attribute("class")

stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2

review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text

info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']")
incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean


review_data = {
"name": name,
"date": date,
"job_title": job_title,
"rating": stars_clean_number,
"full_review": review_body,
"review_source": source,
"validated": validated,
"incentivized": incentivized
}


print(review_data)

success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1

finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")
  • Each review has a date. From each review, we pull the date with review_date[0].get_attribute("datetime")
  • Then, we check if the user's name is present. If it's not, we name the viewer, "anonymous" and give them a number. This prevents different anonymous reviews from getting filtered out
  • if len(job_title_array) > 0 else "n/a" checks if the job_title is present. If it is not, we give it a default value of "n/a". Otherwise we pull the user's job_title from the post.
  • rating_div.get_attribute("class") pulls the CSS class from our rating. We then split("-") to separate the number of stars from the CSS class. After splitting the stars, we divide them by 2 to get the actual rating.
  • review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text gives us the actual review
  • We created an incentives_dirty list to hold all of the incentive tags from the review. If "Review source:" is in the text of the incentive item, we split(": ") to separate the source name and pull it. All other non duplicate items get pushed into the incentives_clean list.
  • If "Validated Reviewer" or "Incentivized Review" is inside the incentives_clean list, we set those variables to True

To summarize, our parsing function takes in a row from our CSV file. Then, it fetches the g2_url for the business. Now that we can extract the correct data from the site, we're ready to read our CSV file and scrape this valuable data.


Step 2: Loading URLs To Scrape

To use process_business(), we need to read rows from the CSV file we created earlier. We're going to update our full code to do just that.

Take a look at the function below:

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)

This function reads the CSV file and then converts all the rows into an array. As we iterate through the array, we pass each row into process_business(). You can view the fully updated code below.

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict

OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")

## Extract Data


div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")


for div_card in div_cards:

name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")

g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")

rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0

if has_rating:
rating = rating_elements[0].text

description = div_card.find_element(By.CSS_SELECTOR, "p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False

while tries <= retries and not success:

driver = webdriver.Chrome(options=OPTIONS)
driver.get(url, location=location)

try:
review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']")


anon_count = 0
for review_card in review_cards:
review_date = review_card.find_elements(By.CSS_SELECTOR, "time")
has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0
if len(review_date) > 0 and has_text:
date = review_date[0].get_attribute("datetime")
name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']")
name = name_array[0].text if len(name_array) > 0 else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1


job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']")
job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a"

rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']")
rating_div = rating_container.find_element(By.CSS_SELECTOR, "div")

rating_class = rating_div.get_attribute("class")

stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2

review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text

info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']")
incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean


review_data = {
"name": name,
"date": date,
"job_title": job_title,
"rating": stars_clean_number,
"full_review": review_body,
"review_source": source,
"validated": validated,
"incentivized": incentivized
}


print(review_data)

success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1

finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

In the code above, process_results() reads rows from our CSV file. It then passes each of these rows into process_business(). process_business() extracts our data and then prints it to the terminal.


Step 3: Storing the Scraped Data

We're getting the data we need. Now we need to store it... Sound familiar? Our DataPipeline is already built for this, but we need another @dataclass. Take a look at the snippet below, it's our ReviewData.

@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Our ReviewData uses the following fields to represent reviews on the page:

  • name: str
  • date: str
  • job_title: str
  • rating: float
  • full_review: str
  • review_source: str
  • validated: bool
  • incentivized: bool

In the updated code below, we create a new DataPipeline and pass our ReviewData object into it.

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict

OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")

## Extract Data


div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")


for div_card in div_cards:

name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")

g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")

rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0

if has_rating:
rating = rating_elements[0].text

description = div_card.find_element(By.CSS_SELECTOR, "p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False

while tries <= retries and not success:

driver = webdriver.Chrome(options=OPTIONS)
driver.get(url, location=location)

try:
review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']")


review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find_elements(By.CSS_SELECTOR, "time")
has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0
if len(review_date) > 0 and has_text:
date = review_date[0].get_attribute("datetime")
name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']")
name = name_array[0].text if len(name_array) > 0 else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1


job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']")
job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a"

rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']")
rating_div = rating_container.find_element(By.CSS_SELECTOR, "div")

rating_class = rating_div.get_attribute("class")

stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2

review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text

info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']")
incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean


review_data = ReviewData(
name=name,
date=date,
job_title=job_title,
rating=stars_clean_number,
full_review=review_body,
review_source=source,
validated=validated,
incentivized=incentivized
)

review_pipeline.add_data(review_data)


review_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1

finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

Step 4: Adding Concurrency

To maximize speed and efficiency, we now need to add concurrency. The function below uses ThreadPoolExecutor in basically the same way we did earlier. The biggest difference is that we read our CSV file first.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)

We removed the for loop. The rest of our code remains pretty much the same.


Step 5: Bypassing Anti-Bots

Time to add proxy support again. We've already got the get_scrapeops_url() function so we just need to place it into our script.

driver.get(get_scrapeops_url(url, location=location))

Here is the fully updated code:

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict

OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")

## Extract Data


div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")


for div_card in div_cards:

name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")

g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")

rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0

if has_rating:
rating = rating_elements[0].text

description = div_card.find_element(By.CSS_SELECTOR, "p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline<