Skip to main content

Scrape g2 With Requests and BeautifulSoup

How to Scrape G2 with Requests and BeautifulSoup

When it comes to online business, reputation is everything. Whether you're looking to buy something simple or a long commitment such as finding a new bank, you need a good understanding of anyone you decide to do business with. You can find a ton of different review sites online and G2 is one of the best. It gives us a treasure trove of information.

In this article, we're going to scrape tons of important data from G2.

If you prefer to follow along with a video then check out the video tutorial version here:

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape G2

On G2, all of our data gets nested really deeply within HTML elements on the page. The script below finds the nested information on G2's search results page. It then generates a report for your search.

After writing the search report, the scraper goes through and generates detailed reports on each individual business we collected earlier.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")


for div_card in div_cards:

name = div_card.find("div", class_="product-listing__product-name")

g2_url = name.find("a").get("href")

has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0

if has_rating:
rating = has_rating.text

description = div_card.find("p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")


review_cards = soup.find_all("div", class_="paper paper--white paper--box mb-2 position-relative border-bottom")


review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find("time")
if review_date:
date = review_date.get("datetime")
name_present = review_card.find("a", class_="link--header-color")
name = name_present.text if name_present else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1


job_title_present = review_card.find("div", class_="mt-4th")
job_title = job_title_present.text if job_title_present else "n/a"

rating_container = review_card.find("div", class_="f-1 d-f ai-c mb-half-small-only")
rating_div = rating_container.find("div")

rating_class = rating_div.get("class")

stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2

review_body = review_card.find("div", attrs={"itemprop": "reviewBody"}).text

info_container = review_card.find("div", class_="tags--teal")
incentives_dirty = info_container.find_all("div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean


review_data = ReviewData(
name=name,
date=date,
job_title=job_title,
rating=stars_clean_number,
full_review=review_body,
review_source=source,
validated=validated,
incentivized=incentivized
)

review_pipeline.add_data(review_data)


review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

If you'd like to tweak this scraper, feel free to change any of the following below:

  • keyword_list: Contains a list of keywords to be searched and scraped.
  • MAX_RETRIES: Specifies the number of times the scraper will retry fetching a page if it encounters an error.
  • MAX_THREADS: Defines the maximum number of threads to be used for concurrent scraping.
  • PAGES: Specifies the number of pages to scrape for each keyword.
  • LOCATION: Defines the geographic location from which the scraping requests appear to originate.

How To How To Architect Our G2 Scraper

In order to scrape G2 properly, we actually need to build two scrapers.

  1. Our first scraper is actually a crawler. The crawler performs a search and scrapes all of the relevant results from the search.
  2. After the crawler, we build our scraper. The scraper goes through and scrapes detailed information for all the individual businesses we collected with the crawler.

For example, if we search for online banks, the crawler generates a detailed list of online banks. The scraper then gets detailed reviews for each online bank.

For the best performance and stability, each of these scrapers will need the following:

  • Parsing: so we can pull proper information from a page.
  • Pagination: so we can pull up different pages be more selective about our data.
  • Data Storage: to store our data in a safe, efficient and readable way.
  • Concurrency: to scrape multiple pages at once.
  • Proxy Integration: when scraping anything at scale, we often face the issue of getting blocked. Proxies allow us a redundant connection and reduce our likelihood of getting blocked by different websites.

Understanding How To Scrape G2

Scraping G2 involves several key steps and considerations to ensure that the process is efficient and reliable:

Step 1: How To Request G2 Pages

When we perform a search on G2, our url looks like this:

https://www.g2.com/search?query=online+bank`

https://www.g2.com/search? holds the first part of our url and the query is tacked onto the end: query=online+bank. Additional parameters can be added to the url with &.

Take a look at the search below for online bank.

G2 Search Results

After we've parsed through our search results, we need to get a report about each business from the results. Each business has its own page on G2 and the url is typically constructed like this:

https://www.g2.com/products/name-of-business/reviews

Below is a screenshot of one of G2's individual business pages.

G2 Business Details Page


Step 2: How To Extract Data From G2 Results and Pages

Our G2 data is very deeply nested within the page. Below is a shot of the name of a business nested within the page. All in all the results page isn't too difficult.

g2 HTML Inspection

Extracting data from the individual business pages is quite a bit harder. Take a look below:

g2 HTML Inspection Business Page

If you look at the image above, the rating for the review is not actually in the HTML of the element.

  • The key thing to pay attention to here is stars-8 at the end of the class name.
  • The rating of the review is actually hidden within the CSS class. The 8 is actually our rating but doubled.
  • For instance stars-10 would be a 5 star rating. stars-9 would be 4.5 stars, stars-8, is 4 stars and so on and so forth.

Step 3: How To Control Pagination

In order to scrape at scale, we need to implement pagination. When we paginate our results, we fetch them in uniform sized batches.

If we want all the results from page 1, we fetch page 1. If we want page 2, we fetch page 2. We repeat this process until we've got all the data we want.

To add pagination to our search results, we add the page parameter. Our URL updated for pagination will look like this:

https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}

As we already discussed previously, our url for individual business is setup like this:

https://www.g2.com/products/name-of-business/reviews

Now that we know how to format our urls, we're almost ready to extract our data.


Step 4: Geolocated Data

To handle Geoloacated Data, we'll be using the ScrapeOps Proxy API. If we want to be in Great Britain, we simply set our country parameter to "uk", if we want to be in the US, we can set this param to "us".

When we pass our country into the ScrapeOps API, ScrapeOps will actually route our requests through a server in that country, so even if the site checks our geolocation, our geolocation will show up correctly!


Setting Up Our G2 Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir g2-scraper

cd g2-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build a G2 Search Crawler

Step 1: Create Simple Search Data Parser

To get started, we need to create a parser for G2 search results. This parser is the bedrock of everything else we're going to do.

In the code below, we do the following:

  • while we still have retries left and the operation hasn't succeeded:
    • requests.get(url) fetches the site
    • if we get a status_code of 200, we've got a successful response, if we get any other status_code, we raise an Exception
    • We then pull the name with div_card.find("div", class_="product-listing__product-name")
    • name.find("a").get("href") gets the link to the business, g2_url
    • If there is a rating present on the page, we pull it from the page with has_rating.text. If there is no rating present, we give it a default rating of 0.0
    • div_card.find("p").text gives us the description of the business
    • Finally, we print all of this information to the terminal
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")


for div_card in div_cards:

name = div_card.find("div", class_="product-listing__product-name")

g2_url = name.find("a").get("href")

has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0

if has_rating:
rating = has_rating.text

description = div_card.find("p").text

search_data = {
"name": name.text,
"stars": rating,
"g2_url": g2_url,
"description": description
}


print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, location, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")

In the code above we pull all of our basic information about each business: name, stars, g2_url, and description.

With this information we can create uniform objects representing each business from the page.

Later on, this information goes a long way when generating our crawler report.


Step 2: Add Pagination

We're almost ready to store our data, but before we do, we need to add pagination. As mentioned before, we can paginate our results by simply changing our url.

Our new URL will look like this:

https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}

We use page_number+1 because start_scrape() uses a for loop that starts counting at zero.

Take a look at the updated code below:

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")


for div_card in div_cards:

name = div_card.find("div", class_="product-listing__product-name")

g2_url = name.find("a").get("href")

has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0

if has_rating:
rating = has_rating.text

description = div_card.find("p").text

search_data = {
"name": name.text,
"stars": rating,
"g2_url": g2_url,
"description": description
}


print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

In the code above, we added page_number to scrape_search_results(). We also added a start_scrape() function which gives us the ability to scrape multiple pages.

Later on, we'll add concurrency to this function, but for now, we're just going to use a for loop as a placeholder.


Step 3: Storing the Scraped Data

To store our data, we're going to utilize a couple classes: SearchData and DataPipeline.

While they might look a bit intimidating, these classes are relatively simple.

  • SearchData represents individual businesses.
  • DataPipeline takes SearchData as input.

Once our DataPipeline takes in the SearchData, it compares each object by its name. If two objects have the same name, the second one gets dropped from the report. This simple approach goes a long way when filtering out duplicates.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())



class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")


for div_card in div_cards:

name = div_card.find("div", class_="product-listing__product-name")

g2_url = name.find("a").get("href")

has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0

if has_rating:
rating = has_rating.text

description = div_card.find("p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • DataPipeline creates a pipeline to a CSV file and filters out duplicates on the way to the file
  • SearchData is used to represent business objects to put into the pipeline

Step 4: Adding Concurrency

For best performance, our crawler needs to utilize concurrency. In Python, we can achieve this through multithreading.

The function below uses ThreadPoolExecutor to implement multithreading and crawl multiple pages simultaneously.

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

Let's break down the arguments to executor.map()

  • scrape_search_results tells the executor to run this function on each of thread
  • [keyword] * pages passes our keyword into executor.map() as a list
  • All of our other arugments are also passed in as a list

Here is the fully updated code.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())



class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")


for div_card in div_cards:

name = div_card.find("div", class_="product-listing__product-name")

g2_url = name.find("a").get("href")

has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0

if has_rating:
rating = has_rating.text

description = div_card.find("p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Now that we can crawl concurrently, we can process all of our information much faster.


Step 5: Bypassing Anti-Bots

Anti-bots are software designed to detect and block malicious traffic. They are designed to protect against things such as DDOS attacks and things like that.

While it's not malicious, our crawler looks incredibly different from a normal user. At the moment, it can make dozens of requests in under a second. There is nothing human about that. In order to get past anti-bot software, we utilize the ScrapeOps API.

The function below uses simple string formatting and converts any regular url into a proxied one using the ScrapeOps API.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us"
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

The ScrapeOps Proxy API rotates our IP addresses and always gives us a server located in our country of choice.

Each request we make is coming from a different IP address, so instead of looking like one really abnormal user, our crawler looks like a bunch of different normal users.

In this example, our code barely changes at all, but it brings us to a production ready level. Take a look at the full code example below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())



class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")


for div_card in div_cards:

name = div_card.find("div", class_="product-listing__product-name")

g2_url = name.find("a").get("href")

has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0

if has_rating:
rating = has_rating.text

description = div_card.find("p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Time to run our crawler in production. Take a look at the main below, we're going to scrape 10 pages.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 10
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

PAGES has been set to 10 and LOCATION has been set to "us". Now let's see how long it takes to process 10 pages of data.

Here are the results:

Crawler Results

All in all, it took roughly 23 seconds to process 10 pages of results...approximately 2.3 seconds per page.


Build A G2 Scraper

Our crawler generates detailed reports based on search criteria. Now that we can create a list of businesses, we need to get detailed information on each of those businesses. We do this by building a scraper for all of the individual businesses.

Our scraper will do the following:

  1. Open the report we created
  2. Get the pages from that report
  3. Pull information from these pages
  4. Create an individual report for each of the businesses we've looked up

Throughout this building process, we're going to once again utilize the following: parsing, storage, concurrency, and proxy integration.


Step 1: Create Simple Business Data Parser

Here, we'll just create a simple parsing function for our businesses. Take a look below.

def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")


review_cards = soup.find_all("div", class_="paper paper--white paper--box mb-2 position-relative border-bottom")

anon_count = 0
for review_card in review_cards:
review_date = review_card.find("time")
if review_date:
date = review_date.get("datetime")
name_present = review_card.find("a", class_="link--header-color")
name = name_present.text if name_present else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1


job_title_present = review_card.find("div", class_="mt-4th")
job_title = job_title_present.text if job_title_present else "n/a"

rating_container = review_card.find("div", class_="f-1 d-f ai-c mb-half-small-only")
rating_div = rating_container.find("div")

rating_class = rating_div.get("class")

stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2

review_body = review_card.find("div", attrs={"itemprop": "reviewBody"}).text

info_container = review_card.find("div", class_="tags--teal")
incentives_dirty = info_container.find_all("div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean


review_data = {
"name": name,
"date": date,
"job_title": job_title,
"rating": stars_clean_number,
"full_review": review_body,
"review_source": source,
"validated": validated,
"incentivized": incentivized
}

print("Review Data:", review_data)


success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")
  • All G2 reviews have a date, in each review, we pull the date with date = review_date.get("datetime")
  • Then, we check if the user's name is present. If it's not, we name the viewer, "anonymous" and give them a number. This prevents different anonymous reviews from getting filtered out
  • job_title_present = review_card.find("div", class_="mt-4th") checks if the job_title is present. If it is not, we give it a default value of "n/a". Otherwise we pull the user's job_title from the post.
  • rating_div.get("class") gets us the CSS class of the rating. We then split("-") to separate the number of stars from the CSS class. After splitting the stars, we divide them by 2 to get the actual rating.
  • review_card.find("div", attrs={"itemprop": "reviewBody"}).text gives us the actual review
  • We created an incentives_dirty list to hold all of the incentive tags from the review. If "Review source:" is in the text of the incentive item, we split(": ") to separate the source name and pull it. All other non duplicate items get pushed into the incentives_clean list.
  • If "Validated Reviewer" or "Incentivized Review" is inside the incentives_clean list, we set those variables to True

This function takes in a row from our CSV file and then fetches the g2_url of the business. Once we can get the proper information from the site we're ready to start reading our CSV file and scraping this valuable data.


Step 2: Loading URLs To Scrape

In order to use our process_business() function, we need to be able to read the rows from our CSV file. Now we're going to fully update our code.

Take a look at the function below:

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries)

This function reads the CSV file and returns all the rows as an array. For each row in the array, we pass that row into process_business(). You can view the fully updated code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us"
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
rating: float = 0
num_reviews: int = 0
website: str = ""
g2_url: str = ""
location: str = ""
category: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}"

tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", id="__NEXT_DATA__")
if script_tag:
json_data = json.loads(script_tag.contents[0])

business_units = json_data["props"]["pageProps"]["businessUnits"]

for business in business_units:

name = business.get("displayName").lower().replace(" ", "").replace("'", "")
trustpilot_formatted = business.get("contact")["website"].split("://")[1]
location = business.get("location")
category_list = business.get("categories")
category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a"

## Extract Data
search_data = SearchData(
name = business.get("displayName", ""),
stars = business.get("stars", 0),
rating = business.get("trustScore", 0),
num_reviews = business.get("numberOfReviews", 0),
website = business.get("contact")["website"],
g2_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}",
location = location.get("country", "n/a"),
category = category
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

script = soup.find("script", id="__NEXT_DATA__")

json_data = json.loads(script.contents[0])

business_info = json_data["props"]["pageProps"]

reviews = business_info["reviews"]

for review in reviews:

review_data = {
"name": review["consumer"]["displayName"],
"rating": review["rating"],
"text": review["text"],
"title": review["title"],
"date": review["dates"]["publishedDate"]
}

print(review_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

In the example above, our process_results() function reads the rows from our CSV file and passes each of them into process_business(). process_business() then pulls our information and prints it to the terminal.


Step 3: Storing the Scraped Data

We're fetching the proper data once again. Now we need to store it. Our DataPipeline is already able to do this, we just need another @dataclass.

Take a look at the snippet below, it's our ReviewData.

@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Our ReviewData holds the following fields:

  • name: str
  • date: str
  • job_title: str
  • rating: float
  • full_review: str
  • review_source: str
  • validated: bool
  • incentivized: bool

In the updated code below, we create a new DataPipeline and pass our ReviewData object into it.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")


for div_card in div_cards:

name = div_card.find("div", class_="product-listing__product-name")

g2_url = name.find("a").get("href")

has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0

if has_rating:
rating = has_rating.text

description = div_card.find("p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")


review_cards = soup.find_all("div", class_="paper paper--white paper--box mb-2 position-relative border-bottom")


review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find("time")
if review_date:
date = review_date.get("datetime")
name_present = review_card.find("a", class_="link--header-color")
name = name_present.text if name_present else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1


job_title_present = review_card.find("div", class_="mt-4th")
job_title = job_title_present.text if job_title_present else "n/a"

rating_container = review_card.find("div", class_="f-1 d-f ai-c mb-half-small-only")
rating_div = rating_container.find("div")

rating_class = rating_div.get("class")

stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2

review_body = review_card.find("div", attrs={"itemprop": "reviewBody"}).text

info_container = review_card.find("div", class_="tags--teal")
incentives_dirty = info_container.find_all("div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean


review_data = ReviewData(
name=name,
date=date,
job_title=job_title,
rating=stars_clean_number,
full_review=review_body,
review_source=source,
validated=validated,
incentivized=incentivized
)

review_pipeline.add_data(review_data)


review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_business(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 4: Adding Concurrency

It's now time to add concurrency to our scraper. We'll be able to run process_business() on multiple businesses at the same time.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Other than this, the rest of our code remains mostly the same.


Step 5: Bypassing Anti-Bots

Just like before, we need to add proxy support for bypassing anti-bots. We've already got the get_scrapeops_url() function so we just need to place it into our script.

response = requests.get(get_scrapeops_url(url, location=location))

Here is the fully updated code:

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")


for div_card in div_cards:

name = div_card.find("div", class_="product-listing__product-name")

g2_url = name.find("a").get("href")

has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0

if has_rating:
rating = has_rating.text

description = div_card.find("p").text

search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")


review_cards = soup.find_all("div", class_="paper paper--white paper--box mb-2 position-relative border-bottom")


review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find("time")
if review_date:
date = review_date.get("datetime")
name_present = review_card.find("a", class_="link--header-color")
name = name_present.text if name_present else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1


job_title_present = review_card.find("div", class_="mt-4th")
job_title = job_title_present.text if job_title_present else "n/a"

rating_container = review_card.find("div", class_="f-1 d-f ai-c mb-half-small-only")
rating_div = rating_container.find("div")

rating_class = rating_div.get("class")

stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2

review_body = review_card.find("div", attrs={"itemprop": "reviewBody"}).text

info_container = review_card.find("div", class_="tags--teal")
incentives_dirty = info_container.find_all("div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean


review_data = ReviewData(
name=name,
date=date,
job_title=job_title,
rating=stars_clean_number,
full_review=review_body,
review_source=source,
validated=validated,
incentivized=incentivized
)

review_pipeline.add_data(review_data)


review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Time to test it all out in production! Once again, we'll update our main to crawl 10 pages.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 10
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

As before, I've set our PAGES to 10 and our LOCATION to "us". Here are the results.

Scraper Results

It took just over 351 seconds (including the time it took to create our initial report) to generate a full report and process all the results (196 rows). This comes out to a speed of about 1.79 seconds per business!


When scraping any website, there are always legal and ethical considerations to take into account. You should always comply with a site's Terms of Use and robots.txt.

You can view G2's terms here and their robots.txt is available here.

Always be careful about the information you extract and don't scrape private or confidential data. If a website is hidden behind a login, that is generally considered private data.

If your data does not require a login, it is generally considered to be public data. If you have questions about the legality of your scraping job, it is best to consult an attorney familiar with the laws and localities you're dealing with.


Conclusion

You now know how to crawl and scrape G2. You hould have a decent understanding of parsing, pagination, data storage, concurrency, and proxy integration. You should also have a decent understanding of the find() method in BeautifulSoup and you should understand some pretty complex string operations for extracting data.

If you'd like to learn more about the tools used in this article, take a look at the links below:


More Python Web Scraping Guides

Time to hone your new skills. Go build something! Here at ScrapeOps, we've got loads of resources for you to learn from. If you're in the mood to learn more, check our Python Web Scraping Playbook or take a look at the articles below: