How to Scrape G2 with Requests and BeautifulSoup
When it comes to online business, reputation is everything. Whether you're looking to buy something simple or a long commitment such as finding a new bank, you need a good understanding of anyone you decide to do business with. You can find a ton of different review sites online and G2 is one of the best. It gives us a treasure trove of information.
In this article, we're going to scrape tons of important data from G2.
- TLDR - How to Scrape G2
- How To Architect Our Scraper
- Understanding How To Scrape G2
- Setting Up Our G2 Scraper
- Build A G2 Search Crawler
- Build A G2 Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
If you prefer to follow along with a video then check out the video tutorial version here:
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape G2
On G2, all of our data gets nested really deeply within HTML elements on the page. The script below finds the nested information on G2's search results page. It then generates a report for your search.
After writing the search report, the scraper goes through and generates detailed reports on each individual business we collected earlier.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")
for div_card in div_cards:
name = div_card.find("div", class_="product-listing__product-name")
g2_url = name.find("a").get("href")
has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0
if has_rating:
rating = has_rating.text
description = div_card.find("p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("div", class_="paper paper--white paper--box mb-2 position-relative border-bottom")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find("time")
if review_date:
date = review_date.get("datetime")
name_present = review_card.find("a", class_="link--header-color")
name = name_present.text if name_present else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1
job_title_present = review_card.find("div", class_="mt-4th")
job_title = job_title_present.text if job_title_present else "n/a"
rating_container = review_card.find("div", class_="f-1 d-f ai-c mb-half-small-only")
rating_div = rating_container.find("div")
rating_class = rating_div.get("class")
stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2
review_body = review_card.find("div", attrs={"itemprop": "reviewBody"}).text
info_container = review_card.find("div", class_="tags--teal")
incentives_dirty = info_container.find_all("div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean
review_data = ReviewData(
name=name,
date=date,
job_title=job_title,
rating=stars_clean_number,
full_review=review_body,
review_source=source,
validated=validated,
incentivized=incentivized
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
If you'd like to tweak this scraper, feel free to change any of the following below:
keyword_list
: Contains a list of keywords to be searched and scraped.MAX_RETRIES
: Specifies the number of times the scraper will retry fetching a page if it encounters an error.MAX_THREADS
: Defines the maximum number of threads to be used for concurrent scraping.PAGES
: Specifies the number of pages to scrape for each keyword.LOCATION
: Defines the geographic location from which the scraping requests appear to originate.
How To How To Architect Our G2 Scraper
In order to scrape G2 properly, we actually need to build two scrapers.
- Our first scraper is actually a crawler. The crawler performs a search and scrapes all of the relevant results from the search.
- After the crawler, we build our scraper. The scraper goes through and scrapes detailed information for all the individual businesses we collected with the crawler.
For example, if we search for online banks, the crawler generates a detailed list of online banks. The scraper then gets detailed reviews for each online bank.
For the best performance and stability, each of these scrapers will need the following:
- Parsing: so we can pull proper information from a page.
- Pagination: so we can pull up different pages be more selective about our data.
- Data Storage: to store our data in a safe, efficient and readable way.
- Concurrency: to scrape multiple pages at once.
- Proxy Integration: when scraping anything at scale, we often face the issue of getting blocked. Proxies allow us a redundant connection and reduce our likelihood of getting blocked by different websites.
Understanding How To Scrape G2
Scraping G2 involves several key steps and considerations to ensure that the process is efficient and reliable:
Step 1: How To Request G2 Pages
When we perform a search on G2, our url looks like this:
https://www.g2.com/search?query=online+bank`
https://www.g2.com/search?
holds the first part of our url and the query is tacked onto the end: query=online+bank
. Additional parameters can be added to the url with &
.
Take a look at the search below for online bank.
After we've parsed through our search results, we need to get a report about each business from the results. Each business has its own page on G2 and the url is typically constructed like this:
https://www.g2.com/products/name-of-business/reviews
Below is a screenshot of one of G2's individual business pages.
Step 2: How To Extract Data From G2 Results and Pages
Our G2 data is very deeply nested within the page. Below is a shot of the name
of a business nested within the page. All in all the results page isn't too difficult.
Extracting data from the individual business pages is quite a bit harder. Take a look below:
If you look at the image above, the rating for the review is not actually in the HTML of the element.
- The key thing to pay attention to here is
stars-8
at the end of the class name. - The rating of the review is actually hidden within the CSS class. The
8
is actually our rating but doubled. - For instance
stars-10
would be a 5 star rating.stars-9
would be 4.5 stars,stars-8
, is 4 stars and so on and so forth.
Step 3: How To Control Pagination
In order to scrape at scale, we need to implement pagination. When we paginate our results, we fetch them in uniform sized batches.
If we want all the results from page 1, we fetch page 1. If we want page 2, we fetch page 2. We repeat this process until we've got all the data we want.
To add pagination to our search results, we add the page
parameter. Our URL updated for pagination will look like this:
https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}
As we already discussed previously, our url for individual business is setup like this:
https://www.g2.com/products/name-of-business/reviews
Now that we know how to format our urls, we're almost ready to extract our data.
Step 4: Geolocated Data
To handle Geoloacated Data, we'll be using the ScrapeOps Proxy API. If we want to be in Great Britain, we simply set our country
parameter to "uk"
, if we want to be in the US, we can set this param to "us"
.
When we pass our country
into the ScrapeOps API, ScrapeOps will actually route our requests through a server in that country, so even if the site checks our geolocation, our geolocation will show up correctly!
Setting Up Our G2 Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir g2-scraper
cd g2-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build a G2 Search Crawler
Step 1: Create Simple Search Data Parser
To get started, we need to create a parser for G2 search results. This parser is the bedrock of everything else we're going to do.
In the code below, we do the following:
while
we still haveretries
left and the operation hasn't succeeded:requests.get(url)
fetches the siteif
we get astatus_code
of 200, we've got a successful response, if we get any otherstatus_code
, weraise
anException
- We then pull the
name
withdiv_card.find("div", class_="product-listing__product-name")
name.find("a").get("href")
gets the link to the business,g2_url
- If there is a
rating
present on the page, we pull it from the page withhas_rating.text
. If there is no rating present, we give it a default rating of 0.0 div_card.find("p").text
gives us the description of the business- Finally, we print all of this information to the terminal
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")
for div_card in div_cards:
name = div_card.find("div", class_="product-listing__product-name")
g2_url = name.find("a").get("href")
has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0
if has_rating:
rating = has_rating.text
description = div_card.find("p").text
search_data = {
"name": name.text,
"stars": rating,
"g2_url": g2_url,
"description": description
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, location, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
In the code above we pull all of our basic information about each business: name
, stars
, g2_url
, and description
.
With this information we can create uniform objects representing each business from the page.
Later on, this information goes a long way when generating our crawler report.
Step 2: Add Pagination
We're almost ready to store our data, but before we do, we need to add pagination. As mentioned before, we can paginate our results by simply changing our url.
Our new URL will look like this:
https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}
We use page_number+1
because start_scrape()
uses a for
loop that starts counting at zero.
Take a look at the updated code below:
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")
for div_card in div_cards:
name = div_card.find("div", class_="product-listing__product-name")
g2_url = name.find("a").get("href")
has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0
if has_rating:
rating = has_rating.text
description = div_card.find("p").text
search_data = {
"name": name.text,
"stars": rating,
"g2_url": g2_url,
"description": description
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
In the code above, we added page_number
to scrape_search_results()
. We also added a start_scrape()
function which gives us the ability to scrape multiple pages.
Later on, we'll add concurrency to this function, but for now, we're just going to use a for
loop as a placeholder.
Step 3: Storing the Scraped Data
To store our data, we're going to utilize a couple classes: SearchData
and DataPipeline
.
While they might look a bit intimidating, these classes are relatively simple.
SearchData
represents individual businesses.DataPipeline
takesSearchData
as input.
Once our DataPipeline
takes in the SearchData
, it compares each object by its name
. If two objects have the same name
, the second one gets dropped from the report. This simple approach goes a long way when filtering out duplicates.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")
for div_card in div_cards:
name = div_card.find("div", class_="product-listing__product-name")
g2_url = name.find("a").get("href")
has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0
if has_rating:
rating = has_rating.text
description = div_card.find("p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
DataPipeline
creates a pipeline to a CSV file and filters out duplicates on the way to the fileSearchData
is used to represent business objects to put into the pipeline
Step 4: Adding Concurrency
For best performance, our crawler needs to utilize concurrency. In Python, we can achieve this through multithreading.
The function below uses ThreadPoolExecutor
to implement multithreading and crawl multiple pages simultaneously.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
Let's break down the arguments to executor.map()
scrape_search_results
tells the executor to run this function on each of thread[keyword] * pages
passes ourkeyword
intoexecutor.map()
as alist
- All of our other arugments are also passed in as a
list
Here is the fully updated code.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")
for div_card in div_cards:
name = div_card.find("div", class_="product-listing__product-name")
g2_url = name.find("a").get("href")
has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0
if has_rating:
rating = has_rating.text
description = div_card.find("p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Now that we can crawl concurrently, we can process all of our information much faster.
Step 5: Bypassing Anti-Bots
Anti-bots are software designed to detect and block malicious traffic. They are designed to protect against things such as DDOS attacks and things like that.
While it's not malicious, our crawler looks incredibly different from a normal user. At the moment, it can make dozens of requests in under a second. There is nothing human about that. In order to get past anti-bot software, we utilize the ScrapeOps API.
The function below uses simple string formatting and converts any regular url into a proxied one using the ScrapeOps API.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us"
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
The ScrapeOps Proxy API rotates our IP addresses and always gives us a server located in our country
of choice.
Each request we make is coming from a different IP address, so instead of looking like one really abnormal user, our crawler looks like a bunch of different normal users.
In this example, our code barely changes at all, but it brings us to a production ready level. Take a look at the full code example below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")
for div_card in div_cards:
name = div_card.find("div", class_="product-listing__product-name")
g2_url = name.find("a").get("href")
has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0
if has_rating:
rating = has_rating.text
description = div_card.find("p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Time to run our crawler in production. Take a look at the main
below, we're going to scrape 10 pages.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 10
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
PAGES
has been set to 10
and LOCATION
has been set to "us"
. Now let's see how long it takes to process 10 pages of data.
Here are the results:
All in all, it took roughly 23 seconds to process 10 pages of results...approximately 2.3 seconds per page.
Build A G2 Scraper
Our crawler generates detailed reports based on search criteria. Now that we can create a list of businesses, we need to get detailed information on each of those businesses. We do this by building a scraper for all of the individual businesses.
Our scraper will do the following:
- Open the report we created
- Get the pages from that report
- Pull information from these pages
- Create an individual report for each of the businesses we've looked up
Throughout this building process, we're going to once again utilize the following: parsing, storage, concurrency, and proxy integration.
Step 1: Create Simple Business Data Parser
Here, we'll just create a simple parsing function for our businesses. Take a look below.
def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("div", class_="paper paper--white paper--box mb-2 position-relative border-bottom")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find("time")
if review_date:
date = review_date.get("datetime")
name_present = review_card.find("a", class_="link--header-color")
name = name_present.text if name_present else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1
job_title_present = review_card.find("div", class_="mt-4th")
job_title = job_title_present.text if job_title_present else "n/a"
rating_container = review_card.find("div", class_="f-1 d-f ai-c mb-half-small-only")
rating_div = rating_container.find("div")
rating_class = rating_div.get("class")
stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2
review_body = review_card.find("div", attrs={"itemprop": "reviewBody"}).text
info_container = review_card.find("div", class_="tags--teal")
incentives_dirty = info_container.find_all("div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean
review_data = {
"name": name,
"date": date,
"job_title": job_title,
"rating": stars_clean_number,
"full_review": review_body,
"review_source": source,
"validated": validated,
"incentivized": incentivized
}
print("Review Data:", review_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")
- All G2 reviews have a
date
, in each review, we pull thedate
withdate = review_date.get("datetime")
- Then, we check if the user's name is present. If it's not, we name the viewer,
"anonymous"
and give them a number. This prevents different anonymous reviews from getting filtered out job_title_present = review_card.find("div", class_="mt-4th")
checks if thejob_title
is present. If it is not, we give it a default value of"n/a"
. Otherwise we pull the user'sjob_title
from the post.rating_div.get("class")
gets us the CSS class of the rating. We thensplit("-")
to separate the number of stars from the CSS class. After splitting the stars, we divide them by 2 to get the actual rating.review_card.find("div", attrs={"itemprop": "reviewBody"}).text
gives us the actual review- We created an
incentives_dirty
list to hold all of the incentive tags from the review. If"Review source:"
is in the text of the incentive item, wesplit(": ")
to separate the source name and pull it. All other non duplicate items get pushed into theincentives_clean
list. - If
"Validated Reviewer"
or"Incentivized Review"
is inside theincentives_clean
list, we set those variables toTrue
This function takes in a row
from our CSV file and then fetches the g2_url
of the business. Once we can get the proper information from the site we're ready to start reading our CSV file and scraping this valuable data.
Step 2: Loading URLs To Scrape
In order to use our process_business()
function, we need to be able to read the rows from our CSV file. Now we're going to fully update our code.
Take a look at the function below:
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries)
This function reads the CSV file and returns all the rows as an array. For each row in the array, we pass that row into process_business()
. You can view the fully updated code below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us"
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
rating: float = 0
num_reviews: int = 0
website: str = ""
g2_url: str = ""
location: str = ""
category: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", id="__NEXT_DATA__")
if script_tag:
json_data = json.loads(script_tag.contents[0])
business_units = json_data["props"]["pageProps"]["businessUnits"]
for business in business_units:
name = business.get("displayName").lower().replace(" ", "").replace("'", "")
trustpilot_formatted = business.get("contact")["website"].split("://")[1]
location = business.get("location")
category_list = business.get("categories")
category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a"
## Extract Data
search_data = SearchData(
name = business.get("displayName", ""),
stars = business.get("stars", 0),
rating = business.get("trustScore", 0),
num_reviews = business.get("numberOfReviews", 0),
website = business.get("contact")["website"],
g2_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}",
location = location.get("country", "n/a"),
category = category
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
script = soup.find("script", id="__NEXT_DATA__")
json_data = json.loads(script.contents[0])
business_info = json_data["props"]["pageProps"]
reviews = business_info["reviews"]
for review in reviews:
review_data = {
"name": review["consumer"]["displayName"],
"rating": review["rating"],
"text": review["text"],
"title": review["title"],
"date": review["dates"]["publishedDate"]
}
print(review_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
In the example above, our process_results()
function reads the rows from our CSV file and passes each of them into process_business()
. process_business()
then pulls our information and prints it to the terminal.
Step 3: Storing the Scraped Data
We're fetching the proper data once again. Now we need to store it. Our DataPipeline
is already able to do this, we just need another @dataclass
.
Take a look at the snippet below, it's our ReviewData
.
@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Our ReviewData
holds the following fields:
name: str
date: str
job_title: str
rating: float
full_review: str
review_source: str
validated: bool
incentivized: bool
In the updated code below, we create a new DataPipeline
and pass our ReviewData
object into it.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")
for div_card in div_cards:
name = div_card.find("div", class_="product-listing__product-name")
g2_url = name.find("a").get("href")
has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0
if has_rating:
rating = has_rating.text
description = div_card.find("p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("div", class_="paper paper--white paper--box mb-2 position-relative border-bottom")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find("time")
if review_date:
date = review_date.get("datetime")
name_present = review_card.find("a", class_="link--header-color")
name = name_present.text if name_present else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1
job_title_present = review_card.find("div", class_="mt-4th")
job_title = job_title_present.text if job_title_present else "n/a"
rating_container = review_card.find("div", class_="f-1 d-f ai-c mb-half-small-only")
rating_div = rating_container.find("div")
rating_class = rating_div.get("class")
stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2
review_body = review_card.find("div", attrs={"itemprop": "reviewBody"}).text
info_container = review_card.find("div", class_="tags--teal")
incentives_dirty = info_container.find_all("div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean
review_data = ReviewData(
name=name,
date=date,
job_title=job_title,
rating=stars_clean_number,
full_review=review_body,
review_source=source,
validated=validated,
incentivized=incentivized
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 4: Adding Concurrency
It's now time to add concurrency to our scraper. We'll be able to run process_business()
on multiple businesses at the same time.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Other than this, the rest of our code remains mostly the same.
Step 5: Bypassing Anti-Bots
Just like before, we need to add proxy support for bypassing anti-bots. We've already got the get_scrapeops_url()
function so we just need to place it into our script.
response = requests.get(get_scrapeops_url(url, location=location))
Here is the fully updated code:
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="product-listing mb-1 border-bottom")
for div_card in div_cards:
name = div_card.find("div", class_="product-listing__product-name")
g2_url = name.find("a").get("href")
has_rating = div_card.find("span", class_="fw-semibold")
rating = 0.0
if has_rating:
rating = has_rating.text
description = div_card.find("p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.find_all("div", class_="paper paper--white paper--box mb-2 position-relative border-bottom")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find("time")
if review_date:
date = review_date.get("datetime")
name_present = review_card.find("a", class_="link--header-color")
name = name_present.text if name_present else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1
job_title_present = review_card.find("div", class_="mt-4th")
job_title = job_title_present.text if job_title_present else "n/a"
rating_container = review_card.find("div", class_="f-1 d-f ai-c mb-half-small-only")
rating_div = rating_container.find("div")
rating_class = rating_div.get("class")
stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2
review_body = review_card.find("div", attrs={"itemprop": "reviewBody"}).text
info_container = review_card.find("div", class_="tags--teal")
incentives_dirty = info_container.find_all("div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean
review_data = ReviewData(
name=name,
date=date,
job_title=job_title,
rating=stars_clean_number,
full_review=review_body,
review_source=source,
validated=validated,
incentivized=incentivized
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Time to test it all out in production! Once again, we'll update our main
to crawl 10 pages.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 10
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
As before, I've set our PAGES
to 10
and our LOCATION
to "us"
. Here are the results.
It took just over 351 seconds (including the time it took to create our initial report) to generate a full report and process all the results (196 rows). This comes out to a speed of about 1.79 seconds per business!
Legal and Ethical Considerations
When scraping any website, there are always legal and ethical considerations to take into account. You should always comply with a site's Terms of Use and robots.txt
.
You can view G2's terms here and their robots.txt
is available here.
Always be careful about the information you extract and don't scrape private or confidential data. If a website is hidden behind a login, that is generally considered private data.
If your data does not require a login, it is generally considered to be public data. If you have questions about the legality of your scraping job, it is best to consult an attorney familiar with the laws and localities you're dealing with.
Conclusion
You now know how to crawl and scrape G2. You hould have a decent understanding of parsing, pagination, data storage, concurrency, and proxy integration. You should also have a decent understanding of the find()
method in BeautifulSoup and you should understand some pretty complex string operations for extracting data.
If you'd like to learn more about the tools used in this article, take a look at the links below:
More Python Web Scraping Guides
Time to hone your new skills. Go build something! Here at ScrapeOps, we've got loads of resources for you to learn from. If you're in the mood to learn more, check our Python Web Scraping Playbook or take a look at the articles below: