How to Scrape Trustpilot with Requests and BeautifulSoup
In today's world, pretty much everything is online. This is especially true when it comes to business. Trustpilot allows users to rate and review various businesses, making it an excellent resource for evaluating companies you're uncertain about.
In this tutorial, we're going to go over the finer points of scraping Trustpilot and generating some detailed reports.
- TLDR How to Scrape Trustpilot
- How To Architect Our Scraper
- Understanding How To Scrape Trustpilot
- Setting Up Our Trustpilot Scraper
- Build A Trustpilot Search Crawler
- Build A Trustpilot Scraper
- Legal and Ethical Considerations
- Conclusion
- More Puppeteer Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Trustpilot
When scraping Trustpilot, all of our important data gets embedded in a JSON blob on the page. This goes for both the search results page and individual business pages.
If we can pull and parse the JSON, we can get all the information we want to scrape.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us"
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
rating: float = 0
num_reviews: int = 0
website: str = ""
trustpilot_url: str = ""
location: str = ""
category: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
rating: float = 0
text: str = ""
title: str = ""
date: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", id="__NEXT_DATA__")
if script_tag:
json_data = json.loads(script_tag.contents[0])
business_units = json_data["props"]["pageProps"]["businessUnits"]
for business in business_units:
name = business.get("displayName").lower().replace(" ", "").replace("'", "")
trustpilot_formatted = business.get("contact")["website"].split("://")[1]
location = business.get("location")
category_list = business.get("categories")
category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a"
## Extract Data
search_data = SearchData(
name = business.get("displayName", ""),
stars = business.get("stars", 0),
rating = business.get("trustScore", 0),
num_reviews = business.get("numberOfReviews", 0),
website = business.get("contact")["website"],
trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}",
location = location.get("country", "n/a"),
category = category
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["trustpilot_url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
script = soup.find("script", id="__NEXT_DATA__")
json_data = json.loads(script.contents[0])
business_info = json_data["props"]["pageProps"]
reviews = business_info["reviews"]
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review in reviews:
review_data = ReviewData(
name= review["consumer"]["displayName"],
rating= review["rating"],
text= review["text"],
title= review["title"],
date= review["dates"]["publishedDate"]
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['trustpilot_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['trustpilot_url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
If you'd like to tweak this scraper, feel free to change any of the keyword_list
or any of the following constants as well:
MAX_RETRIES
MAX_THREADS
PAGES
LOCATION
How To How To Architect Our Trustpilot Scraper
When we build our Trustpilot scraper, we actually need to build two scrapers.
- Our first scraper is a crawler. This crawler will look up search results, interpret them, and store them in a CSV file.
- Our second scraper will be our review scraper.
For the best performance and stability, each of these scrapers will need the following:
- Parsing: so we can pull proper information from a page.
- Pagination: so we can pull up different pages be more selective about our data.
- Data Storage: to store our data in a safe, efficient and readable way.
- Concurrency: to scrape multiple pages at once.
- Proxy Integration: when scraping anything at scale, we often face the issue of getting blocked. Proxies allow us a redundant connection and reduce our likelihood of getting blocked by different websites.
Understanding How To Scrape Trustpilot
Step 1: How To Request Trustpilot Pages
When we perform a search using Trustpilot, our URL typically looks like this:
https://www.trustpilot.com/search?query=word1+word2
Go ahead and take a look at the screenshot below, which shows a search for the term online bank.
The other type of page we request from Trustpilot is the business page. This is the portion where we scrape our reviews from. This URL is pretty strange, here is the convention:
https://www.trustpilot.com/review/actual_website_domain_name
The example below is a screenshot for good-bank.de.
Since the site's domain name is good-bank.de
, the Trustpilot URL would be:
https://www.trustpilot.com/review/good-bank.de
While this naming convention is unorthodox, we can base our URL system on this.
Step 2: How To Extract Data From Trustpilot Results and Pages
When pulling our data from Trustpilot, we actually get pretty lucky. Our data actually gets saved in the page as a JSON blob.
This is extremely convenient because we don't have to constantly be looking for nested HTML/CSS elements. We need to look for one tag, script
. script
holds JavaScript, and the JavaScript holds our JSON.
Here is the JSON blob from good-bank.de
.
On both our search results, and our business pages, all the information we want is saved in a script
tag with an id
of "__NEXT_DATA__"
.
Step 3: How To Control Pagination
To paginate our results, we can use the following format for our search URL:
https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}
So if we wanted page 1 of online banks, our URL would look like this:
https://www.trustpilot.com/search?query=online+bank&page=1
As we already discussed previously, our url for individual business is setup like this:
https://www.trustpilot.com/review/actual_website_domain_name
With a system ready for our URLs, we're all set to extract our data.
Step 4: Geolocated Data
To handle Geoloacated Data, we'll be using the ScrapeOps Proxy API.
- If we want to be in Great Britain, we simply set our
country
parameter to"uk"
, - if we want to be in the US, we can set this param to
"us"
.
When we pass our country
into the ScrapeOps API, ScrapeOps will actually route our requests through a server in that country, so even if the site checks our geolocation, our geolocation will show up correctly!
Setting Up Our Trustpilot Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir trustpilot-scraper
cd trustpilot-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A Trustpilot Search Crawler
Step 1: Create Simple Search Data Parser
We'll start by building a parser for our search results. The goal here is pretty simple: fetch a page and pull information from it.
Along with parsing, we'll add some basic retry logic as well. With retries, if we fail to get our information on the first try, our parser will keep trying until it runs out of retries.
The Python script below is pretty basic, but this is the foundation to our entire project.
while
we still haveretries
left and the operation hasn't succeeded, we get the page and find thescript
tag with theid
,"__NEXT_DATA__"
.- From within this object, we pull all of our relevant information from the JSON blob and then print it to the terminal.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", id="__NEXT_DATA__")
if script_tag:
json_data = json.loads(script_tag.contents[0])
business_units = json_data["props"]["pageProps"]["businessUnits"]
for business in business_units:
name = business.get("displayName").lower().replace(" ", "").replace("'", "")
trustpilot_formatted = business.get("contact")["website"].split("://")[1]
location = business.get("location")
category_list = business.get("categories")
category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a"
## Extract Data
search_data = {
"name": business.get("displayName", ""),
"stars": business.get("stars", 0),
"rating": business.get("trustScore", 0),
"num_reviews": business.get("numberOfReviews", 0),
"website": business.get("contact")["website"],
"trustpilot_url": f"https://www.trustpilot.com/review/{trustpilot_formatted}",
"location": location.get("country", "n/a"),
"category": category
}
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
Step 2: Add Pagination
Now that we can pull information from a Trustpilot page, we need to be able to decide which page we want to scrape. We can do this by using pagination.
As discussed above, our paginated URL is laid out like this:
https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}
Here is our fully updated code, it hasn't really changed much yet.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", id="__NEXT_DATA__")
if script_tag:
json_data = json.loads(script_tag.contents[0])
business_units = json_data["props"]["pageProps"]["businessUnits"]
for business in business_units:
name = business.get("displayName").lower().replace(" ", "").replace("'", "")
trustpilot_formatted = business.get("contact")["website"].split("://")[1]
location = business.get("location")
category_list = business.get("categories")
category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a"
## Extract Data
search_data = {
"name": business.get("displayName", ""),
"stars": business.get("stars", 0),
"rating": business.get("trustScore", 0),
"num_reviews": business.get("numberOfReviews", 0),
"website": business.get("contact")["website"],
"trustpilot_url": f"https://www.trustpilot.com/review/{trustpilot_formatted}",
"location": location.get("country", "n/a"),
"category": category
}
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
We also added a start_scrape()
function which gives us the ability to scrape multiple pages. Later on, we'll add concurrency to this function, but for now, we're just going to use a for
loop as a placeholder.
Step 3: Storing the Scraped Data
Now that we can retrieve our data properly, it's time to start storing it. In this example, we're going to add a SearchData
class and a DataPipeline
class.
SearchData
is a dataclass
and the purpose of it is to simply hold our data. Once we've instantiated the SearchData
, we can pass it into our DataPipeline
.
Take a look at the updated code.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
rating: float = 0
num_reviews: int = 0
website: str = ""
trustpilot_url: str = ""
location: str = ""
category: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", id="__NEXT_DATA__")
if script_tag:
json_data = json.loads(script_tag.contents[0])
business_units = json_data["props"]["pageProps"]["businessUnits"]
for business in business_units:
name = business.get("displayName").lower().replace(" ", "").replace("'", "")
trustpilot_formatted = business.get("contact")["website"].split("://")[1]
location = business.get("location")
category_list = business.get("categories")
category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a"
## Extract Data
search_data = SearchData(
name = business.get("displayName", ""),
stars = business.get("stars", 0),
rating = business.get("trustScore", 0),
num_reviews = business.get("numberOfReviews", 0),
website = business.get("contact")["website"],
trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}",
location = location.get("country", "n/a"),
category = category
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline, retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- The
DataPipeline
creates a pipeline to a CSV file. If the file already exists, we append it. If it doesn't exist, we create it. - Once the
SearchData
gets passed into ourDataPipeline
, theDataPipeline
filters out our duplicates and stores the rest of our relevant data to a CSV file.
Step 4: Adding Concurrency
To maximize our efficieny, we need to add concurrency. Concurrency gives us the ability to process multiple pages at once.
Here we'll be using ThreadPoolExecutor
for multithreading.
Our only major difference here is the start_scrape()
function. Here is what it looks like now:
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
Here is the fully updated code.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
rating: float = 0
num_reviews: int = 0
website: str = ""
trustpilot_url: str = ""
location: str = ""
category: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", id="__NEXT_DATA__")
if script_tag:
json_data = json.loads(script_tag.contents[0])
business_units = json_data["props"]["pageProps"]["businessUnits"]
for business in business_units:
name = business.get("displayName").lower().replace(" ", "").replace("'", "")
trustpilot_formatted = business.get("contact")["website"].split("://")[1]
location = business.get("location")
category_list = business.get("categories")
category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a"
## Extract Data
search_data = SearchData(
name = business.get("displayName", ""),
stars = business.get("stars", 0),
rating = business.get("trustScore", 0),
num_reviews = business.get("numberOfReviews", 0),
website = business.get("contact")["website"],
trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}",
location = location.get("country", "n/a"),
category = category
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
With concurrency, we can now scrape numerous pages all at the same time.
Step 5: Bypassing Anti-Bots
In the wild, scrapers are often caught and blocked by anti-bots. Anti-bots are software designed to find and block malicious traffic. While our scraper is not malicious, it is incredibly fast and at this point there is nothing human about it. These abnormalities will likely get us flagged and probably blocked.
To bypass these anti-bots, we need to use a proxy. The ScrapeOps Proxy will rotate our IP address and therefore, each request we make seems like its coming from a different location. So instead of one bot making a ton of bizarre requests to the server, our scraper will look like many instances of normal client side traffic.
This function does all of this for us:
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us"
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
In this example, our code barely changes at all, but it brings us to a production ready level. Take a look at the full code example below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us"
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
rating: float = 0
num_reviews: int = 0
website: str = ""
trustpilot_url: str = ""
location: str = ""
category: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", id="__NEXT_DATA__")
if script_tag:
json_data = json.loads(script_tag.contents[0])
business_units = json_data["props"]["pageProps"]["businessUnits"]
for business in business_units:
name = business.get("displayName").lower().replace(" ", "").replace("'", "")
trustpilot_formatted = business.get("contact")["website"].split("://")[1]
location = business.get("location")
category_list = business.get("categories")
category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a"
## Extract Data
search_data = SearchData(
name = business.get("displayName", ""),
stars = business.get("stars", 0),
rating = business.get("trustScore", 0),
num_reviews = business.get("numberOfReviews", 0),
website = business.get("contact")["website"],
trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}",
location = location.get("country", "n/a"),
category = category
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
It's finally time to run our crawler in production. Take a look at our main
. I'm changing a few constants here.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 10
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
PAGES
has been set to 10
and LOCATION
has been set to "us"
. Now let's see how long it takes to process 10 pages of data.
Here are the results:
We processed 10 pages in just over 4 seconds!!!
Build A Trustpilot Scraper
Our crawler gives us great results in a fast and efficient way. Now we need to pair our crawler with a scraper. This scraper will be pulling information about the individual businesses that we save in the report that our crawler generates.
Our scraper will do the following:
- Open the report we created
- Get the pages from that report
- Pull information from these pages
- Create an individual report for each of the businesses we've looked up
Along with this process, we'll once again utilize our basic steps from the crawler: parsing, storage, concurrency, and proxy integration.
Step 1: Create Simple Business Data Parser
Here, we'll just create a simple parsing function. Take a look below.
def process_business(row, location, retries=3):
url = row["trustpilot_url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
script = soup.find("script", id="__NEXT_DATA__")
json_data = json.loads(script.contents[0])
business_info = json_data["props"]["pageProps"]
reviews = business_info["reviews"]
for review in reviews:
review_data = {
"name": review["consumer"]["displayName"],
"rating": review["rating"],
"text": review["text"],
"title": review["title"],
"date": review["dates"]["publishedDate"]
}
print(review_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['trustpilot_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['trustpilot_url']}")
- This function takes in a
row
from our CSV file and then fetches thetrustpilot_url
of the business. - Once we've got the page, we once again look for the
script
tag with theid
of"__NEXT_DATA__"
to find our JSON blob. - From within our JSON blob, we pull information from each review listed within the blob.
Step 2: Loading URLs To Scrape
In order to use our process_business()
function, we need to be able to read the rows from our CSV file. Now we're going to fully update our code.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us"
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
rating: float = 0
num_reviews: int = 0
website: str = ""
trustpilot_url: str = ""
location: str = ""
category: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", id="__NEXT_DATA__")
if script_tag:
json_data = json.loads(script_tag.contents[0])
business_units = json_data["props"]["pageProps"]["businessUnits"]
for business in business_units:
name = business.get("displayName").lower().replace(" ", "").replace("'", "")
trustpilot_formatted = business.get("contact")["website"].split("://")[1]
location = business.get("location")
category_list = business.get("categories")
category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a"
## Extract Data
search_data = SearchData(
name = business.get("displayName", ""),
stars = business.get("stars", 0),
rating = business.get("trustScore", 0),
num_reviews = business.get("numberOfReviews", 0),
website = business.get("contact")["website"],
trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}",
location = location.get("country", "n/a"),
category = category
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["trustpilot_url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
script = soup.find("script", id="__NEXT_DATA__")
json_data = json.loads(script.contents[0])
business_info = json_data["props"]["pageProps"]
reviews = business_info["reviews"]
for review in reviews:
review_data = {
"name": review["consumer"]["displayName"],
"rating": review["rating"],
"text": review["text"],
"title": review["title"],
"date": review["dates"]["publishedDate"]
}
print(review_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['trustpilot_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['trustpilot_url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
In the example above, our process_results()
function reads the rows from our CSV file and passes each of them into process_business()
.
process_business()
then pulls our information and prints it to the terminal.
Step 3: Storing the Scraped Data
Once again, we're now in the position where we need to store our data. We'll add a ReviewData
class. This class is going to simply hold data, just like our SearchData
.
We then pass our ReviewData
into a DataPipeline
just like we did earlier.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us"
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
rating: float = 0
num_reviews: int = 0
website: str = ""
trustpilot_url: str = ""
location: str = ""
category: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
rating: float = 0
text: str = ""
title: str = ""
date: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", id="__NEXT_DATA__")
if script_tag:
json_data = json.loads(script_tag.contents[0])
business_units = json_data["props"]["pageProps"]["businessUnits"]
for business in business_units:
name = business.get("displayName").lower().replace(" ", "").replace("'", "")
trustpilot_formatted = business.get("contact")["website"].split("://")[1]
location = business.get("location")
category_list = business.get("categories")
category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a"
## Extract Data
search_data = SearchData(
name = business.get("displayName", ""),
stars = business.get("stars", 0),
rating = business.get("trustScore", 0),
num_reviews = business.get("numberOfReviews", 0),
website = business.get("contact")["website"],
trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}",
location = location.get("country", "n/a"),
category = category
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["trustpilot_url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
script = soup.find("script", id="__NEXT_DATA__")
json_data = json.loads(script.contents[0])
business_info = json_data["props"]["pageProps"]
reviews = business_info["reviews"]
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review in reviews:
review_data = ReviewData(
name= review["consumer"]["displayName"],
rating= review["rating"],
text= review["text"],
title= review["title"],
date= review["dates"]["publishedDate"]
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['trustpilot_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['trustpilot_url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_business(row, location, retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 4: Adding Concurrency
Once again, we need to add concurrency. This time, instead of scraping multiple result pages at once, we're obviously going to be scraping multiple business pages at once.
Here is our process_results()
function refactored for conccurency.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
The rest of our code largely remains the same.
Step 5: Bypassing Anti-Bots
To finish everything off, we once again need to add support for anti-bots. Our final example really only has one relevant change.
response = requests.get(get_scrapeops_url(url, location=location))
Here is the fully updated code:
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": "us"
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
rating: float = 0
num_reviews: int = 0
website: str = ""
trustpilot_url: str = ""
location: str = ""
category: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
rating: float = 0
text: str = ""
title: str = ""
date: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.trustpilot.com/search?query={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.find("script", id="__NEXT_DATA__")
if script_tag:
json_data = json.loads(script_tag.contents[0])
business_units = json_data["props"]["pageProps"]["businessUnits"]
for business in business_units:
name = business.get("displayName").lower().replace(" ", "").replace("'", "")
trustpilot_formatted = business.get("contact")["website"].split("://")[1]
location = business.get("location")
category_list = business.get("categories")
category = category_list[0]["categoryId"] if len(category_list) > 0 else "n/a"
## Extract Data
search_data = SearchData(
name = business.get("displayName", ""),
stars = business.get("stars", 0),
rating = business.get("trustScore", 0),
num_reviews = business.get("numberOfReviews", 0),
website = business.get("contact")["website"],
trustpilot_url = f"https://www.trustpilot.com/review/{trustpilot_formatted}",
location = location.get("country", "n/a"),
category = category
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_business(row, location, retries=3):
url = row["trustpilot_url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
script = soup.find("script", id="__NEXT_DATA__")
json_data = json.loads(script.contents[0])
business_info = json_data["props"]["pageProps"]
reviews = business_info["reviews"]
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review in reviews:
review_data = ReviewData(
name= review["consumer"]["displayName"],
rating= review["rating"],
text= review["text"],
title= review["title"],
date= review["dates"]["publishedDate"]
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['trustpilot_url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['trustpilot_url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_business,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Let's run both the crawler and the scraper together in production. Here is our updated main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 10
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["online bank"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
As before, I've changed our PAGES
to 10
and our LOCATION
to "us"
. Here are the results.
It took just over 100 seconds (including the time it took to create our initial report) to generate a full report and process all the results (86 rows). This comes out to a speed of about 1.17 seconds per business.
Legal and Ethical Considerations
When scraping any website, you need to always pay attention to the site's terms and conditions. You can view Trustpilot's consumer terms here.
You should also respect the site's robots.txt.
You can view their robots.txt
file here.
Always be careful about the information you extract and don't scrape private or confidential data.
- If a website is hidden behind a login, that is generally considered private data.
- If your data does not require a login, it is generally considered to be public data.
If you have questions about the legality of your scraping job, it is best to consult an attorney familiar with the laws and localities you're dealing with.
Conclusion
You now know how to build both a crawler and a scraper for Trustpilot. You know how to utilize parsing, pagination, data storage, concurrency, and proxy integration. You should also know how to deal with blobs of JSON data. Dealing with JSON is a very important skill not only in web scraping but software development in general.
If you'd like to learn more about the tools used in this article, take a look at the links below:
More Python Web Scraping Guides
Now that you've got these new skills, it's time to practice them... Go build something!
Here at ScrapeOps, we've got loads of resources for you to learn from. If you're in the mood to learn more, check out our The Python Web Scraping Playbook or take a look at the articles below.