How to Scrape G2 with Selenium
At G2, we get the ability to view in-depth and verified reviews for tons of different businesses. If you're looking for a decent review site, G2 is one of the best. We get a treasure trove of information with all sorts of details about the reviews and reviewers.
By following along with this article, you'll be able to retrieve all sorts of data from G2 and you'll learn how to do the following when building scrapers in the future.
- TLDR - How to Scrape G2
- How To Architect Our Scraper
- Understanding How To Scrape G2
- Setting Up Our G2 Scraper
- Build A G2 Search Crawler
- Build A G2 Scraper
- Legal and Ethical Considerations
- Conclusion
- More Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape G2
When we need to scrape G2, all the information we want comes deeply nested within the HTML elements on the page. Extracting this information is difficult. Luckily, if you want a ready-to-go G2 scraper, we've got one for you right here. All you need to do is create a config.json
file with your ScrapeOps API key.
This script will perform a search based on any keywords in the keywords_list
and then generate a detailed report on businesses that match that keyword.
After generating the report, the scraper reads it and does a detailed search report on each individual business from the original report.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")
for div_card in div_cards:
name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")
g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0
if has_rating:
rating = rating_elements[0].text
description = div_card.find_element(By.CSS_SELECTOR, "p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(get_scrapeops_url(url, location=location))
try:
review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find_elements(By.CSS_SELECTOR, "time")
has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0
if len(review_date) > 0 and has_text:
date = review_date[0].get_attribute("datetime")
name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']")
name = name_array[0].text if len(name_array) > 0 else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1
job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']")
job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a"
rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']")
rating_div = rating_container.find_element(By.CSS_SELECTOR, "div")
rating_class = rating_div.get_attribute("class")
stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2
review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text
info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']")
incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean
review_data = ReviewData(
name=name,
date=date,
job_title=job_title,
rating=stars_clean_number,
full_review=review_body,
review_source=source,
validated=validated,
incentivized=incentivized
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
If you'd like to tweak this scraper, feel free to change any of the following below:
keyword_list
: Contains a list of keywords to be searched and scraped.MAX_RETRIES
: Specifies the number of times the scraper will retry fetching a page if it encounters an error.MAX_THREADS
: Defines the maximum number of threads to be used for concurrent scraping.PAGES
: Specifies the number of pages to scrape for each keyword.LOCATION
: Defines the geographic location from which the scraping requests appear to originate.
How To How To Architect Our G2 Scraper
To scrape G2, we are actually going to build two different scrapers.
- Our first one will be the crawler. The crawler performs a search and parses the results. It then takes each business from these results and saves it to a CSV file.
- After the crawler, we build our scraper. The purpose of the scraper is to read the CSV file. After it reads the CSV file, the scraper will go through and scrape detailed reviews of every business collected during the crawl.
The crawler generates a detailed list of businesses. The scraper then gets detailed reviews for each business.
For the best performance and stability, each of these scrapers will need the following:
- Parsing: so we can pull proper information from a page.
- Pagination: so we can pull up different pages be more selective about our data.
- Data Storage: to store our data in a safe, efficient and readable way.
- Concurrency: to scrape multiple pages at once.
- Proxy Integration: when scraping anything at scale, we often face the issue of getting blocked. Proxies allow us a redundant connection and reduce our likelihood of getting blocked by different websites.
Understanding How To Scrape G2
Step 1: How To Request G2 Pages
Here is a standrd G2 URL:
https://www.g2.com/search?query=online+bank
https://www.g2.com/search?
holds the first part of our URL. Our query gets added onto the end: query=online+bank
. We can also add more parameters with &
.
Take a look at the search below for online bank.
Once we've got our search results, we need to create a report about each business from those results. Each business has its own page on G2. The URL for each business is typically constructed like this:
https://www.g2.com/products/name-of-business/reviews
Below is a screenshot of one of G2's individual business pages.
Step 2: How To Extract Data From G2 Results and Pages
Our G2 data is very deeply nested within the page. Below is a screenshot of the name
of a business nested within the page. All in all the results page isn't too difficult to parse through, we're only going to be taking 4 pieces of data from each result.
Extracting data from the individual business pages is much more difficult. Take a look below:
Pay close attention here.
- Specifically, look at
stars-8
at the end of the class name. - The rating of the review is actually hidden within our CSS class. The
8
is actually our 4.0 rating... doubled. stars-10
would be a 5 star rating.stars-9
would be 4.5 stars,stars-8
, is 4 stars... you get the idea.- The
stars-number
is always double the actual rating.
Step 3: How To Control Pagination
Now, we need pagination. to paginate our results, we need to fetch them in uniform sized batches.
If we want all the results from page 1, we fetch page 1. If we want page 2, we fetch page 2. We repeat this process until we've got our desired data. In order to accomplish this, we need to add the page
parameter.
Our updated URL should look like this:
https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}
As mentioned earlier, our url for an individual business gets constructed in this format:
https://www.g2.com/products/name-of-business/reviews
Once we're building our URLs properly, it's almost time to start fetching and parsing our data.
Step 4: Geolocated Data
To handle Geoloacated Data, we'll be using the ScrapeOps Proxy API. If we want to be in Great Britain, we simply set our country
parameter to "uk"
, if we want to be in the US, we can set this param to "us"
.
When we pass our country
into the ScrapeOps API, ScrapeOps will actually route our requests through a server in that country, so even if the site checks our geolocation, our geolocation will show up correctly!
Setting Up Our G2 Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir g2-scraper
cd g2-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install selenium
Make sure you have webdriver installed! If you don't, you can check here
Build A G2 Search Crawler
Step 1: Create Simple Search Data Parser
In order to pull the data from a search page, we need to parse it. We're going to get setup and write some code that does exactly this.
In the code below, we do the following:
while
we still haveretries
left and the operation hasn't succeeded:driver.get(url)
fetches the site- We then pull the
name
withdiv_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")
name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
gets the link to the business,g2_url
- If there is a
rating
present on the page, we pull it from the page withrating_elements[0].text
. If there is no rating present, we give it a default of 0.0 description = div_card.find_element(By.CSS_SELECTOR, "p").text
gives us the description of the business- Finally, we print all of this information to the terminal
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(url)
driver.save_screenshot("test.png")
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")
for div_card in div_cards:
name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")
g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0
if has_rating:
rating = rating_elements[0].text
description = div_card.find_element(By.CSS_SELECTOR, "p").text
search_data = {
"name": name.text,
"stars": rating,
"g2_url": g2_url,
"description": description
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
The code above finds and prints all the basic data for each business: name
, stars
, g2_url
, and description
. We'll use this information to create uniform objects representing each business from the search results.
This information is the very foundation for our crawler report.
Step 2: Add Pagination
It's almost time to store our data, but before we do this, we need pagination. As you probably remember, we can paginate our results by the url. Our new URL will look like this:
https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}
We use page_number+1
because start_scrape()
begins counting at zero.
Take a look at the updated code below:
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")
for div_card in div_cards:
name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")
g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0
if has_rating:
rating = rating_elements[0].text
description = div_card.find_element(By.CSS_SELECTOR, "p").text
search_data = {
"name": name.text,
"stars": rating,
"g2_url": g2_url,
"description": description
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page_number, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
In the example above, we added page_number
to scrape_search_results()
. We also added a start_scrape()
function which gives us the ability to scrape multiple pages.
Later on, we're going to add concurrency to this function. For the moment, we'll to use a simple for
loop as a placeholder.
Step 3: Storing the Scraped Data
For data storage, we need two classes: SearchData
and DataPipeline
. They might look a bit scary, but these classes are actually pretty simple.
SearchData
is used to represent individual business objects.- The
DataPipeline
takes ourSearchData
as input.- Once the
DataPipeline
takes in ourSearchData
, it compares each object byname
. - When two objects have the same
name
, the second one gets dropped. This approach works really well when removing duplicates.
- Once the
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")
for div_card in div_cards:
name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")
g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0
if has_rating:
rating = rating_elements[0].text
description = div_card.find_element(By.CSS_SELECTOR, "p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page_number, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
DataPipeline
gives us an efficient pipeline to a CSV output fileSearchData
objects become individual rows in our CSV file
Step 4: Adding Concurrency
A for
loop isn't good enough if we want to run our crawler at scale in production.
The function below refactors our start_scrape()
function to use ThreadPoolExecutor
and take advantage of the multithreading offered by our CPU.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
Let's break down the arguments to executor.map()
scrape_search_results
tells the executor to run this function on each of thread[keyword] * pages
passes ourkeyword
intoexecutor.map()
as alist
- All of our other arugments get passed in as a
list
as well
Here is the fully updated code.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
driver.get(url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")
for div_card in div_cards:
name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")
g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0
if has_rating:
rating = rating_elements[0].text
description = div_card.find_element(By.CSS_SELECTOR, "p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
We can now crawl concurrently, all of our data will now come much faster.
Step 5: Bypassing Anti-Bots
Anti-bots are special software designed to detect and block malicious bots. They protect against things such as DDOS attacks and other things like that. Our crawler isn't malicious, but it looks drastically different from a normal user. It makes dozens of requests in under a second. There is nothing human about that. In order to get past anti-bot software, we need the ScrapeOps API.
The function below uses simple string formatting and converts any regular url into a proxied one using the ScrapeOps Proxy API.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us"
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
The ScrapeOps Proxy API rotates IP addresses and always gives us a server located in a country
of our choice.
Each request we make is going to come from a different IP address. Instead of looking like one really weird user, our crawler looks like a random group of normal users.
Our code barely changes at all here, but we're now a production ready level. Take a look at the full code example below.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")
for div_card in div_cards:
name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")
g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0
if has_rating:
rating = rating_elements[0].text
description = div_card.find_element(By.CSS_SELECTOR, "p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Now, we'll run this crawler in production. Take a look at the main
below, we're going to scrape 10 pages.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 10
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
PAGES
now gets set to 10
and LOCATION
gets set to "us"
. Now, we need to process 10 pages of data.
Here are the results:
All in all, it took roughly 30 seconds to process 10 pages of results...about 3 seconds per page.
Build A G2 Scraper
Our crawler builds reports based on our search criteria. Now that we're outputting a list of businesses, we need to extract detailed information about each of them. We can achieve this by building a scraper for these businesses.
Our scraper will do the following:
- Open the report we created
- Get the pages from that report
- Pull information from these pages
- Create an individual report for each of the businesses we've looked up
As we build the review scraper for this project, we'll once again use the following: parsing, storage, concurrency, and proxy integration.
Step 1: Create Simple Business Data Parser
Let's write a simple parser that reads a row of our CSV file and processes the business from that row. Take a look below.
def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(url, location=location)
try:
review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find_elements(By.CSS_SELECTOR, "time")
has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0
if len(review_date) > 0 and has_text:
date = review_date[0].get_attribute("datetime")
name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']")
name = name_array[0].text if len(name_array) > 0 else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1
job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']")
job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a"
rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']")
rating_div = rating_container.find_element(By.CSS_SELECTOR, "div")
rating_class = rating_div.get_attribute("class")
stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2
review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text
info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']")
incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean
review_data = {
"name": name,
"date": date,
"job_title": job_title,
"rating": stars_clean_number,
"full_review": review_body,
"review_source": source,
"validated": validated,
"incentivized": incentivized
}
print(review_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")
- Each review has a
date
. From each review, we pull thedate
withreview_date[0].get_attribute("datetime")
- Then, we check if the user's name is present. If it's not, we name the viewer,
"anonymous"
and give them a number. This prevents different anonymous reviews from getting filtered out if len(job_title_array) > 0 else "n/a"
checks if thejob_title
is present. If it is not, we give it a default value of"n/a"
. Otherwise we pull the user'sjob_title
from the post.rating_div.get_attribute("class")
pulls the CSS class from our rating. We thensplit("-")
to separate the number of stars from the CSS class. After splitting the stars, we divide them by 2 to get the actual rating.review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text
gives us the actual review- We created an
incentives_dirty
list to hold all of the incentive tags from the review. If"Review source:"
is in the text of the incentive item, wesplit(": ")
to separate the source name and pull it. All other non duplicate items get pushed into theincentives_clean
list. - If
"Validated Reviewer"
or"Incentivized Review"
is inside theincentives_clean
list, we set those variables toTrue
To summarize, our parsing function takes in a row
from our CSV file. Then, it fetches the g2_url
for the business. Now that we can extract the correct data from the site, we're ready to read our CSV file and scrape this valuable data.
Step 2: Loading URLs To Scrape
To use process_business()
, we need to read rows from the CSV file we created earlier. We're going to update our full code to do just that.
Take a look at the function below:
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
This function reads the CSV file and then converts all the rows into an array. As we iterate through the array, we pass each row into process_business()
. You can view the fully updated code below.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")
for div_card in div_cards:
name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")
g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0
if has_rating:
rating = rating_elements[0].text
description = div_card.find_element(By.CSS_SELECTOR, "p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(url, location=location)
try:
review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find_elements(By.CSS_SELECTOR, "time")
has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0
if len(review_date) > 0 and has_text:
date = review_date[0].get_attribute("datetime")
name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']")
name = name_array[0].text if len(name_array) > 0 else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1
job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']")
job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a"
rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']")
rating_div = rating_container.find_element(By.CSS_SELECTOR, "div")
rating_class = rating_div.get_attribute("class")
stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2
review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text
info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']")
incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean
review_data = {
"name": name,
"date": date,
"job_title": job_title,
"rating": stars_clean_number,
"full_review": review_body,
"review_source": source,
"validated": validated,
"incentivized": incentivized
}
print(review_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
In the code above, process_results()
reads rows from our CSV file. It then passes each of these rows into process_business()
. process_business()
extracts our data and then prints it to the terminal.
Step 3: Storing the Scraped Data
We're getting the data we need. Now we need to store it... Sound familiar? Our DataPipeline
is already built for this, but we need another @dataclass
. Take a look at the snippet below, it's our ReviewData
.
@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Our ReviewData
uses the following fields to represent reviews on the page:
name: str
date: str
job_title: str
rating: float
full_review: str
review_source: str
validated: bool
incentivized: bool
In the updated code below, we create a new DataPipeline
and pass our ReviewData
object into it.
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")
for div_card in div_cards:
name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")
g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0
if has_rating:
rating = rating_elements[0].text
description = div_card.find_element(By.CSS_SELECTOR, "p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(url, location=location)
try:
review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find_elements(By.CSS_SELECTOR, "time")
has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0
if len(review_date) > 0 and has_text:
date = review_date[0].get_attribute("datetime")
name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']")
name = name_array[0].text if len(name_array) > 0 else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1
job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']")
job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a"
rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']")
rating_div = rating_container.find_element(By.CSS_SELECTOR, "div")
rating_class = rating_div.get_attribute("class")
stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2
review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text
info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']")
incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean
review_data = ReviewData(
name=name,
date=date,
job_title=job_title,
rating=stars_clean_number,
full_review=review_body,
review_source=source,
validated=validated,
incentivized=incentivized
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 4: Adding Concurrency
To maximize speed and efficiency, we now need to add concurrency. The function below uses ThreadPoolExecutor
in basically the same way we did earlier. The biggest difference is that we read our CSV file first.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
We removed the for
loop. The rest of our code remains pretty much the same.
Step 5: Bypassing Anti-Bots
Time to add proxy support again. We've already got the get_scrapeops_url()
function so we just need to place it into our script.
driver.get(get_scrapeops_url(url, location=location))
Here is the fully updated code:
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us",
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
g2_url: str = ""
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
job_title: str = ""
rating: float = 0
full_review: str = ""
review_source: str = ""
validated: bool = False
incentivized: bool = False
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.g2.com/search?page={page_number+1}&query={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='product-listing mb-1 border-bottom']")
for div_card in div_cards:
name = div_card.find_element(By.CSS_SELECTOR, "div[class='product-listing__product-name']")
g2_url = name.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
rating_elements = div_card.find_elements(By.CSS_SELECTOR, "span[class='fw-semibold']")
has_rating = len(rating_elements) > 0
rating = 0.0
if has_rating:
rating = rating_elements[0].text
description = div_card.find_element(By.CSS_SELECTOR, "p").text
search_data = SearchData(
name=name.text,
stars=rating,
g2_url=g2_url,
description=description
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["g2_url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(get_scrapeops_url(url, location=location))
try:
review_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='paper paper--white paper--box mb-2 position-relative border-bottom']")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
anon_count = 0
for review_card in review_cards:
review_date = review_card.find_elements(By.CSS_SELECTOR, "time")
has_text = len(review_card.find_elements(By.CSS_SELECTOR, "div[itemprop='reviewBody']")) > 0
if len(review_date) > 0 and has_text:
date = review_date[0].get_attribute("datetime")
name_array = review_card.find_elements(By.CSS_SELECTOR, "a[class='link--header-color']")
name = name_array[0].text if len(name_array) > 0 else "anonymous"
if name == "anonymous":
name = f"{name}-{anon_count}"
anon_count += 1
job_title_array = review_card.find_elements(By.CSS_SELECTOR, "div[class='mt-4th']")
job_title = job_title_array[0].text if len(job_title_array) > 0 else "n/a"
rating_container = review_card.find_element(By.CSS_SELECTOR, "div[class='f-1 d-f ai-c mb-half-small-only']")
rating_div = rating_container.find_element(By.CSS_SELECTOR, "div")
rating_class = rating_div.get_attribute("class")
stars_string = rating_class[-1]
stars_large_number = float(stars_string.split("-")[-1])
stars_clean_number = stars_large_number/2
review_body = review_card.find_element(By.CSS_SELECTOR, "div[itemprop='reviewBody']").text
info_container = review_card.find_element(By.CSS_SELECTOR, "div[class='tags--teal']")
incentives_dirty = info_container.find_elements(By.CSS_SELECTOR, "div")
incentives_clean = []
source = ""
for incentive in incentives_dirty:
if incentive.text not in incentives_clean:
if "Review source:" in incentive.text:
source = incentive.text.split(": ")[-1]
else:
incentives_clean.append(incentive.text)
validated = "Validated Reviewer" in incentives_clean
incentivized = "Incentivized Review" in incentives_clean
review_data = ReviewData(
name=name,
date=date,
job_title=job_title,
rating=stars_clean_number,
full_review=review_body,
review_source=source,
validated=validated,
incentivized=incentivized
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['g2_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['g2_url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Let's run the whole thing in production! Like before, we'll update our main
to crawl 10 pages.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 10
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Just like before, I set our PAGES
to 10
and our LOCATION
to "us"
. Here are the results.
It took just over 478 seconds (including the time it took to create our initial report) to generate a full report and process all the results (196 rows). This comes out to an average speed of about 2.44 seconds per business.
Legal and Ethical Considerations
Anytime you're scraping a website, there will always legal and ethical considerations to think about. You should always comply with a site's Terms of Use and robots.txt
.
You can view G2's terms here and their robots.txt
is available here.
Always be careful about the information you extract and don't scrape private or confidential data. If a website is hidden behind a login, that is generally considered private data.
If your data does not require a login, it is generally considered to be public data. If you have questions about the legality of your scraping job, it is best to consult an attorney familiar with the laws and localities you're dealing with.
Conclusion
You now know how to crawl and scrape G2 using Selenium Python. You hould have a solid grasp of parsing, pagination, data storage, concurrency, and proxy integration. You should also have a decent understanding of the find_element()
and find_elements()
methods from Selenium and you should understand some pretty complex string operations for extracting data.
If you'd like to learn more about the tools used in this article, take a look at the links below:
More Web Scraping Guides
Build something and practice your new skills. Whatever you do...Go build something! Here at ScrapeOps, we've got loads of resources for you to learn from.
If you're in the mood to learn more, check our Python Selenium Web Scraping Playbook or take a look at the articles below: