How to Scrape SimilarWeb With Requests and BeautifulSoup
SimilarWeb is a great place to find useful information about any website such as rank
, category
, rank_change
, average_vist
, pages_per_visit
, and bounce_rate
. Each of these metrics can provide critical data and insight into what users are doing when they access the site.
In this tutorial, we're going to learn how to scrape SimilarWeb using Requests & BeautifulSoup.
- TLDR: How to Scrape SimilarWeb
- How To Architect Our Scraper
- Understanding How To Scrape SimilarWeb
- Setting Up Our SimilarWeb Scraper
- Build A SimilarWeb Search Crawler
- Build A SimilarWeb Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
TLDR - How to Scrape SimilarWeb
Scraping SimilarWeb can be a really difficult job. For starters, SimilarWeb blocks people after a certain amount of access so we absolutely need a proxy with rotating IP addresses.
If you want to scrape it, use this scraper below.
- Create a new project folder with a
config.json
file. - After creating the config file, add your ScrapeOps API key
{"api_key": "your-super-secret-api-key"}
. - Then, copy/paste the code below into a new python file.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")
rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"
rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)
average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
else:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
blocked = soup.find("div", class_="wa-limit-modal")
if blocked:
raise Exception(f"Blocked")
competitors = soup.find_all("div", class_="wa-competitors__list-item")
competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for competitor in competitors:
site_name = competitor.find("span", class_="wa-competitors__list-item-title").text
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find("span", class_="app-progress__value").text
target_spans = competitor.find_all("span", "wa-competitors__list-column")
monthly_visits = target_spans[2].text
category = target_spans[3].text
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0"))
competitor_data = CompetitorData(
name=site_name,
url=link,
affinity=affinity,
monthly_visits=monthly_visits,
category=category,
category_rank=category_rank
)
competitor_pipeline.add_data(competitor_data)
competitor_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_website(row, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []
## Job Processes
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, retries=MAX_RETRIES)
To tweak your results, feel free to change any of the following:
MAX_THREADS
: Defines the number of concurrent threads used during the scraping and processing tasks.MAX_RETRIES
: Determines the maximum number of retries the script will attempt if a request fails (e.g., due to a network issue or a non-200 status code).keyword_list
: A list of dictionaries where each dictionary contains a "category" and "subcategory" that specify the type of websites to scrape from SimilarWeb.filename
: The base name used to create the CSV file where the scraped data will be saved.
How To Architect Our SimilarWeb Scraper
Scraping SimilarWeb can be challenging. As soon as you really try to do anything with it, you're prompted to create an account in order to gain any real access to the site.
This not only makes the site difficult to scrape, but it makes it difficult to even access it from a traditional browser! Even though we're prompted to create an account, we still can perform some actions before we wind up getting blocked every time.
With ScrapeOps Proxy Aggregator, we can utilize rotating proxies to get past this... we're getting blocked based on our IP address. It's much more difficult to block us if we're using a fresh IP with each ping.
Our SimilarWeb scraper will follow a similar architecture to most of our other scraping projects from our "How to Scrape" series. We'll need to build both a crawler and a scraper.
- Our crawler will lookup top sites on a particular category.
- Then, our scraper will go through and scrape the competitors and their respective info for each of these top ranked sites.
We'll use an iterative building process to add the following features:
- Lookup a particular site and parse its data.
- Store the parsed data inside an easy to manage CSV file.
- Concurrently search multiple categories at the same time.
- Use the ScrapeOps Proxy Aggregator to get past these anti-bots and free trial prompts.
Our scraper will be built in the following interations:
- Look up and parse the competitors from a row in the CSV file generated earlier.
- Save the competitors of each site to a new CSV report.
- Concurrently run steps 1 and 2 simultaneously.
- Once again, use the ScrapeOps Proxy Aggregator to get past any anti-bots and free trial prompts.
Understanding How To Scrape SimilarWeb
We need to understand SimilarWeb at a high level before we start writing any serious code.
In these next few sections, let's take a look at exactly how we're going to access our information and how to pick it from the page.
Step 1: How To Request SimilarWeb Pages
Like everything else on the web, we need to begin with a simple GET request. The front page of SimilarWeb isn't really very useful, so we'll query a specific endpoint.
In this case, we'll be looking up the top 50 humor sites. Here is our URL:
https://www.similarweb.com/top-websites/arts-and-entertainment/humor/
The URL gets laid out in the following structure:
https://www.similarweb.com/top-websites/{CATEGORY}/{SUBCATEGORY}/
For any specific search, we need both a category
and a subcategory
. In this case, our category
is "arts-and-entertainment"
while our subcategory is "humor"
.
You can view a shot of the page below.
When you view the page for a specific site, the URL is looks like this:
https://www.similarweb.com/website/pikabu.ru/
The layout goes as follows:
https://www.similarweb.com/website/{NAME_OF_SITE}/
Step 2: How To Extract Data From SimilarWeb Results and Pages
Extracting the data can be a little tricky. However, if we have access to site, this is completely doable. For starters, some of our content is loaded dynamically.
To load our dynamic content, we need to use the wait
parameter when talking to ScrapeOps. After we have our loaded page, we just need to find the information using its CSS class.
For the results pages, each row has a class of top-table__row
. We can find all these rows and easily extract their data from there.
To extract our competitors, we extract div
elements with the class
of wa-competitors__list-item
. Each of these div
tags holds all the data for each competitor.
On top if these pages, we need to be aware of the modal that SimilarWeb uses to block us. If this modal is present, we need to retry our request. As you can see in the image below, it's a div
with a class
of wa-limit-modal
.
Step 3: Geolocated Data
ScrapeOps gives the ability to control our geolocation via the country
parameter. However, with SimilarWeb we don't want to control our geolocation.
Instead of controlling our location, we want as many IP addresses as possible to reduce our likelihood of getting blocked and asked to sign in/ sign up like you saw in the previous section.
By not controlling our location, this gives us a much larger pool of IP addresses to use.
Setting Up Our SimilarWeb Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir similarweb-scraper
cd similarweb-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A SimilarWeb Search Crawler
Time to start building! In the next few sections, we'll go through and build our crawler piece by piece.
We'll start with a parser. Next we'll add data storage. Then, we'll add in concurrency. Finally, we'll add proxy integration.
Step 1: Create Simple Search Data Parser
Parsing is the first step of our scrape. In the code below, we create our basic script and add structure like error handling and retries. Most importantly, we implement our base parsing function.
To see how our data gets extracted, pay close attention to scrape_search_results()
.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")
rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"
rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)
average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text
search_data = {
"name": site_name,
"url": link,
"rank": rank,
"rank_change": rank_change,
"average_visit": average_visit,
"pages_per_visit": pages_per_visit,
"bounce_rate": bounce_rate
}
rank+=1
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, retries=3):
for keyword in keywords:
scrape_search_results(keyword, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []
## Job Processes
filename = "arts-and-entertainment"
start_scrape(keyword_list, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
When parsing the page, this is where we extract our data:
- First, we find all our rows,
rows = soup.find_all("tr", class_="top-table__row")
. - We find our
link_holder
withrow.find("a", class_="tw-table__compare")
. - Using the
link_holder
object, we extract oursite_name
and construct ourlink
. rank_change_holder.find("span").get("class")[1]
is used to find whether the rank went up or down.- We then find the average visit with
row.find("span", class_="tw-table__avg-visit-duration").text
. float(row.find("span", class_="tw-table__pages-per-visit").text)
finds ourpages_per_visit
.- Finally, we get our
bounce_rate
withrow.find("span", class_="tw-table__bounce-rate").text
.
Step 2: Storing the Scraped Data
Once we've got our data, we need to store it. To store our data, we need to make a couple classes. We need a dataclass
, SearchData
.
SearchData
will be used to represent individual objects from our search results. Once we have a SearchData
object, we need to pass it into a DataPipeline
.
Our DataPipeline
is used to open a pipe to a CSV file. The pipeline filters out duplicates by name
and then saves all non-duplicate objects to a CSV file.
Here is our SearchData
class. We use this to represent individual ranking results.
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is our DataPipeline
.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
When we put it all together, we need to open a new DataPipeline
and pass it into start_scrape()
. start_scrape()
then passes the pipeline into our parsing function.
Instead of printing our parsed data, we now pass that into the pipeline. Once we're finished parsing the results, we go ahead and close the DataPipeline
.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")
rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"
rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)
average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, data_pipeline=None, retries=3):
for keyword in keywords:
scrape_search_results(keyword, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []
## Job Processes
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- Each item in our results is represented in our code as
SearchData
. - These
SearchData
objects then get passed into ourDataPipeline
and saved to a CSV file.
Step 3: Adding Concurrency
Now, we need to add concurrency. We'll use ThreadPoolExecutor
to add support for multithreading. Once we can open multiple threads, we can use those threads to run our parsing function on multiple pages concurrently.
Here is our start_scrape()
function adjusted for concurrency.
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
scrape_search_results
is the function we'd like to call using multiple threads.keywords
is the array of things we'd like to search.- All other args to
scrape_search_results
get passed in as arrays.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")
rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"
rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)
average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []
## Job Processes
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
We now have the concurrency capability to crawl multiple categories at once.
Step 4: Bypassing Anti-Bots
To properly scrape SimilarWeb, we need a ton of IP addresses.
To get as many addresses as possible, we're going to use just three parameters, API_KEY
, url
and wait
. This tells ScrapeOps that we want to wait
3 seconds for content to render and we don't care which country we're routed through.
This gives us the largest pool of potential IP addresses because we can be routed through any server that ScrapeOps supports.
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
The code below contains our production ready crawler.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")
rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"
rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)
average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []
## Job Processes
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Alright! It's time to run this code in production. As you've noticed, we have our MAX_THREADS
set to 5. We're only searching 2 categories, so ThreadPoolExecutor
will run this on 2 threads and finish it out.
In the next half of our article, when we write the scraper, we'll take advantage of all 5 threads.
Here is our main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []
## Job Processes
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Here are the results from our crawl. They are all over the place. On one run, it took 20.645 seconds. On the next, it took 66.588 seconds. This shows that when SimilarWeb begins blocking us, ScrapeOps looks for new servers to use until each request is successful.
Build A SimilarWeb Scraper
Now that we're running a proper crawl and saving the results, we need to do something with those results. In this section, we'll go through and scrape the competitors to each site we extracted during the crawl.
The scraper needs to do the following:
- Read the CSV into an array.
- Parse the websites from the array.
- Store the competitor data from the parsing stage.
- Run steps 2 and 3 concurrently for faster results.
- Integrate with the ScrapeOps Proxy Aggregator to get past anti-bots and other roadblocks.
Step 1: Create Simple Website Data Parser
Just like before, we're going to start with a parsing function. This one will find all of the competitor objects on the page and extract their data.
def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
else:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
blocked = soup.find("div", class_="wa-limit-modal")
if blocked:
raise Exception(f"Blocked")
competitors = soup.find_all("div", class_="wa-competitors__list-item")
for competitor in competitors:
site_name = competitor.find("span", class_="wa-competitors__list-item-title").text
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find("span", class_="app-progress__value").text
target_spans = competitor.find_all("span", "wa-competitors__list-column")
monthly_visits = target_spans[2].text
category = target_spans[3].text
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0"))
competitor_data = {
"name": site_name,
"url": link,
"affinity": affinity,
"monthly_visits": monthly_visits,
"category": category,
"category_rank": category_rank
}
print(competitor_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
This time, during our parse, we execute these steps:
- Find all of the competitor rows:
soup.find_all("div", class_="wa-competitors__list-item")
. - Iterate through the competitor rows.
- For each competitor, we pull the following:
site_name
affinity
monthly_visits
category
category_link
- We construct the url by once again formatting the
site_name
.
Step 2: Loading URLs To Scrape
We have our parsing function, but it needs a url to work. Here, we'll add another function that reads urls from the CSV and calls process_website()
on each row from the file.
Here is our process_results()
function.
def process_results(csv_file, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_website(row, retries=retries)
You can see how it all fits together in the code below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")
rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"
rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)
average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
else:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
blocked = soup.find("div", class_="wa-limit-modal")
if blocked:
raise Exception(f"Blocked")
competitors = soup.find_all("div", class_="wa-competitors__list-item")
for competitor in competitors:
site_name = competitor.find("span", class_="wa-competitors__list-item-title").text
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find("span", class_="app-progress__value").text
target_spans = competitor.find_all("span", "wa-competitors__list-column")
monthly_visits = target_spans[2].text
category = target_spans[3].text
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0"))
competitor_data = {
"name": site_name,
"url": link,
"affinity": affinity,
"monthly_visits": monthly_visits,
"category": category,
"category_rank": category_rank
}
print(competitor_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_website(row, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []
## Job Processes
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, retries=MAX_RETRIES)
process_results()
reads our CSV into an array.- For each row of the file, we run
process_website()
on the row.
Step 3: Storing the Scraped Data
Without storage, there wouldn't be a point in scraping to begin with. We've already got the DataPipeline
, we just need a dataclass
to feed into it. We're going to create a new one called CompetitorData
. It's very much like our SearchData
.
Here is our CompetitorData
class.
@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In our revised code below, we open another DataPipeline
inside our parsing function and we pass CompetitorData
into it.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")
rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"
rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)
average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
else:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
blocked = soup.find("div", class_="wa-limit-modal")
if blocked:
raise Exception(f"Blocked")
competitors = soup.find_all("div", class_="wa-competitors__list-item")
competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for competitor in competitors:
site_name = competitor.find("span", class_="wa-competitors__list-item-title").text
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find("span", class_="app-progress__value").text
target_spans = competitor.find_all("span", "wa-competitors__list-column")
monthly_visits = target_spans[2].text
category = target_spans[3].text
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0"))
competitor_data = CompetitorData(
name=site_name,
url=link,
affinity=affinity,
monthly_visits=monthly_visits,
category=category,
category_rank=category_rank
)
competitor_pipeline.add_data(competitor_data)
competitor_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_website(row, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []
## Job Processes
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, retries=MAX_RETRIES)
CompetitorData
is used to represent the competitors we extract from the page.- We open a new
DataPipeline
inside of our parsing function and pass theseCompetitorData
objects into the pipeline.
Step 4: Adding Concurrency
We now need to add concurrency. Instead of searching multiple categories this time, we'll need to run our parsing function on multiple rows simultaneously.
To accomplish this, we're going to refactor process_results()
to take advantage of multiple threads using ThreadPoolExecutor
.
Here is our multithreaded process_results()
.
def process_results(csv_file, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_website,
reader,
[retries] * len(reader)
)
process_website
is the function we want to call on multiple threads.reader
is the array of objects that we want to process with multiple threads.retries
gets passed in as an array the length ofreader
as well.
All arguments to process_website
get passed into executor.map()
as arrays. These then get passed into process_website
.
Here is our full code up to this point.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")
rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"
rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)
average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
else:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
blocked = soup.find("div", class_="wa-limit-modal")
if blocked:
raise Exception(f"Blocked")
competitors = soup.find_all("div", class_="wa-competitors__list-item")
competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for competitor in competitors:
site_name = competitor.find("span", class_="wa-competitors__list-item-title").text
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find("span", class_="app-progress__value").text
target_spans = competitor.find_all("span", "wa-competitors__list-column")
monthly_visits = target_spans[2].text
category = target_spans[3].text
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0"))
competitor_data = CompetitorData(
name=site_name,
url=link,
affinity=affinity,
monthly_visits=monthly_visits,
category=category,
category_rank=category_rank
)
competitor_pipeline.add_data(competitor_data)
competitor_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_website,
reader,
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []
## Job Processes
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file