Skip to main content

Scrape SimilarWeb With Python Requests and BeautifulSoup

How to Scrape SimilarWeb With Requests and BeautifulSoup

SimilarWeb is a great place to find useful information about any website such as rank, category, rank_change, average_vist, pages_per_visit, and bounce_rate. Each of these metrics can provide critical data and insight into what users are doing when they access the site.

In this tutorial, we're going to learn how to scrape SimilarWeb using Requests & BeautifulSoup.


TLDR - How to Scrape SimilarWeb

Scraping SimilarWeb can be a really difficult job. For starters, SimilarWeb blocks people after a certain amount of access so we absolutely need a proxy with rotating IP addresses.

If you want to scrape it, use this scraper below.

  1. Create a new project folder with a config.json file.
  2. After creating the config file, add your ScrapeOps API key {"api_key": "your-super-secret-api-key"}.
  3. Then, copy/paste the code below into a new python file.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")

rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"

rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)

average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text

search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)


def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
else:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
blocked = soup.find("div", class_="wa-limit-modal")
if blocked:
raise Exception(f"Blocked")
competitors = soup.find_all("div", class_="wa-competitors__list-item")

competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for competitor in competitors:
site_name = competitor.find("span", class_="wa-competitors__list-item-title").text
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find("span", class_="app-progress__value").text
target_spans = competitor.find_all("span", "wa-competitors__list-column")

monthly_visits = target_spans[2].text
category = target_spans[3].text
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0"))

competitor_data = CompetitorData(
name=site_name,
url=link,
affinity=affinity,
monthly_visits=monthly_visits,
category=category,
category_rank=category_rank
)

competitor_pipeline.add_data(competitor_data)

competitor_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_website(row, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []

## Job Processes
filename = "arts-and-entertainment"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, retries=MAX_RETRIES)

To tweak your results, feel free to change any of the following:

  • MAX_THREADS: Defines the number of concurrent threads used during the scraping and processing tasks.
  • MAX_RETRIES: Determines the maximum number of retries the script will attempt if a request fails (e.g., due to a network issue or a non-200 status code).
  • keyword_list: A list of dictionaries where each dictionary contains a "category" and "subcategory" that specify the type of websites to scrape from SimilarWeb.
  • filename: The base name used to create the CSV file where the scraped data will be saved.

How To Architect Our SimilarWeb Scraper

Scraping SimilarWeb can be challenging. As soon as you really try to do anything with it, you're prompted to create an account in order to gain any real access to the site.

SimilarWeb Free Trial

This not only makes the site difficult to scrape, but it makes it difficult to even access it from a traditional browser! Even though we're prompted to create an account, we still can perform some actions before we wind up getting blocked every time.

With ScrapeOps Proxy Aggregator, we can utilize rotating proxies to get past this... we're getting blocked based on our IP address. It's much more difficult to block us if we're using a fresh IP with each ping.

Our SimilarWeb scraper will follow a similar architecture to most of our other scraping projects from our "How to Scrape" series. We'll need to build both a crawler and a scraper.

  • Our crawler will lookup top sites on a particular category.
  • Then, our scraper will go through and scrape the competitors and their respective info for each of these top ranked sites.

We'll use an iterative building process to add the following features:

  1. Lookup a particular site and parse its data.
  2. Store the parsed data inside an easy to manage CSV file.
  3. Concurrently search multiple categories at the same time.
  4. Use the ScrapeOps Proxy Aggregator to get past these anti-bots and free trial prompts.

Our scraper will be built in the following interations:

  1. Look up and parse the competitors from a row in the CSV file generated earlier.
  2. Save the competitors of each site to a new CSV report.
  3. Concurrently run steps 1 and 2 simultaneously.
  4. Once again, use the ScrapeOps Proxy Aggregator to get past any anti-bots and free trial prompts.

Understanding How To Scrape SimilarWeb

We need to understand SimilarWeb at a high level before we start writing any serious code.

In these next few sections, let's take a look at exactly how we're going to access our information and how to pick it from the page.


Step 1: How To Request SimilarWeb Pages

Like everything else on the web, we need to begin with a simple GET request. The front page of SimilarWeb isn't really very useful, so we'll query a specific endpoint.

In this case, we'll be looking up the top 50 humor sites. Here is our URL:

https://www.similarweb.com/top-websites/arts-and-entertainment/humor/

The URL gets laid out in the following structure:

https://www.similarweb.com/top-websites/{CATEGORY}/{SUBCATEGORY}/

For any specific search, we need both a category and a subcategory. In this case, our category is "arts-and-entertainment" while our subcategory is "humor".

You can view a shot of the page below.

Similarweb Top Websites

When you view the page for a specific site, the URL is looks like this:

https://www.similarweb.com/website/pikabu.ru/

The layout goes as follows:

https://www.similarweb.com/website/{NAME_OF_SITE}/

Similarweb Pikabu.ru Page


Step 2: How To Extract Data From SimilarWeb Results and Pages

Extracting the data can be a little tricky. However, if we have access to site, this is completely doable. For starters, some of our content is loaded dynamically.

To load our dynamic content, we need to use the wait parameter when talking to ScrapeOps. After we have our loaded page, we just need to find the information using its CSS class.

For the results pages, each row has a class of top-table__row. We can find all these rows and easily extract their data from there.

Similarweb Top Websites HTML Inspection

To extract our competitors, we extract div elements with the class of wa-competitors__list-item. Each of these div tags holds all the data for each competitor.

Similarweb Pikabu.ru Page HTML Inspection

On top if these pages, we need to be aware of the modal that SimilarWeb uses to block us. If this modal is present, we need to retry our request. As you can see in the image below, it's a div with a class of wa-limit-modal.

Similarweb Create Account HTML Inspection


Step 3: Geolocated Data

ScrapeOps gives the ability to control our geolocation via the country parameter. However, with SimilarWeb we don't want to control our geolocation.

Instead of controlling our location, we want as many IP addresses as possible to reduce our likelihood of getting blocked and asked to sign in/ sign up like you saw in the previous section.

By not controlling our location, this gives us a much larger pool of IP addresses to use.


Setting Up Our SimilarWeb Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir similarweb-scraper

cd similarweb-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A SimilarWeb Search Crawler

Time to start building! In the next few sections, we'll go through and build our crawler piece by piece.

We'll start with a parser. Next we'll add data storage. Then, we'll add in concurrency. Finally, we'll add proxy integration.


Step 1: Create Simple Search Data Parser

Parsing is the first step of our scrape. In the code below, we create our basic script and add structure like error handling and retries. Most importantly, we implement our base parsing function.

To see how our data gets extracted, pay close attention to scrape_search_results().

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")

rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"

rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)

average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text

search_data = {
"name": site_name,
"url": link,
"rank": rank,
"rank_change": rank_change,
"average_visit": average_visit,
"pages_per_visit": pages_per_visit,
"bounce_rate": bounce_rate
}

rank+=1

print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keywords, retries=3):
for keyword in keywords:
scrape_search_results(keyword, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []

## Job Processes
filename = "arts-and-entertainment"

start_scrape(keyword_list, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")

When parsing the page, this is where we extract our data:

  • First, we find all our rows, rows = soup.find_all("tr", class_="top-table__row").
  • We find our link_holder with row.find("a", class_="tw-table__compare").
  • Using the link_holder object, we extract our site_name and construct our link.
  • rank_change_holder.find("span").get("class")[1] is used to find whether the rank went up or down.
  • We then find the average visit with row.find("span", class_="tw-table__avg-visit-duration").text.
  • float(row.find("span", class_="tw-table__pages-per-visit").text) finds our pages_per_visit.
  • Finally, we get our bounce_rate with row.find("span", class_="tw-table__bounce-rate").text.

Step 2: Storing the Scraped Data

Once we've got our data, we need to store it. To store our data, we need to make a couple classes. We need a dataclass, SearchData.

SearchData will be used to represent individual objects from our search results. Once we have a SearchData object, we need to pass it into a DataPipeline.

Our DataPipeline is used to open a pipe to a CSV file. The pipeline filters out duplicates by name and then saves all non-duplicate objects to a CSV file.

Here is our SearchData class. We use this to represent individual ranking results.

@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is our DataPipeline.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

When we put it all together, we need to open a new DataPipeline and pass it into start_scrape(). start_scrape() then passes the pipeline into our parsing function.

Instead of printing our parsed data, we now pass that into the pipeline. Once we're finished parsing the results, we go ahead and close the DataPipeline.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")

rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"

rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)

average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text

search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keywords, data_pipeline=None, retries=3):
for keyword in keywords:
scrape_search_results(keyword, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []

## Job Processes
filename = "arts-and-entertainment"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • Each item in our results is represented in our code as SearchData.
  • These SearchData objects then get passed into our DataPipeline and saved to a CSV file.

Step 3: Adding Concurrency

Now, we need to add concurrency. We'll use ThreadPoolExecutor to add support for multithreading. Once we can open multiple threads, we can use those threads to run our parsing function on multiple pages concurrently.

Here is our start_scrape() function adjusted for concurrency.

def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
  • scrape_search_results is the function we'd like to call using multiple threads.
  • keywords is the array of things we'd like to search.
  • All other args to scrape_search_results get passed in as arrays.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")

rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"

rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)

average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text

search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []

## Job Processes
filename = "arts-and-entertainment"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

We now have the concurrency capability to crawl multiple categories at once.


Step 4: Bypassing Anti-Bots

To properly scrape SimilarWeb, we need a ton of IP addresses.

To get as many addresses as possible, we're going to use just three parameters, API_KEY, url and wait. This tells ScrapeOps that we want to wait 3 seconds for content to render and we don't care which country we're routed through.

This gives us the largest pool of potential IP addresses because we can be routed through any server that ScrapeOps supports.

def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

The code below contains our production ready crawler.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")

rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"

rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)

average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text

search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []

## Job Processes
filename = "arts-and-entertainment"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Alright! It's time to run this code in production. As you've noticed, we have our MAX_THREADS set to 5. We're only searching 2 categories, so ThreadPoolExecutor will run this on 2 threads and finish it out.

In the next half of our article, when we write the scraper, we'll take advantage of all 5 threads.

Here is our main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []

## Job Processes
filename = "arts-and-entertainment"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Here are the results from our crawl. They are all over the place. On one run, it took 20.645 seconds. On the next, it took 66.588 seconds. This shows that when SimilarWeb begins blocking us, ScrapeOps looks for new servers to use until each request is successful.

Crawler Performance 1 Terminal

Crawler Performance 2 Terminal


Build A SimilarWeb Scraper

Now that we're running a proper crawl and saving the results, we need to do something with those results. In this section, we'll go through and scrape the competitors to each site we extracted during the crawl.

The scraper needs to do the following:

  1. Read the CSV into an array.
  2. Parse the websites from the array.
  3. Store the competitor data from the parsing stage.
  4. Run steps 2 and 3 concurrently for faster results.
  5. Integrate with the ScrapeOps Proxy Aggregator to get past anti-bots and other roadblocks.

Step 1: Create Simple Website Data Parser

Just like before, we're going to start with a parsing function. This one will find all of the competitor objects on the page and extract their data.

def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
else:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
blocked = soup.find("div", class_="wa-limit-modal")
if blocked:
raise Exception(f"Blocked")
competitors = soup.find_all("div", class_="wa-competitors__list-item")

for competitor in competitors:
site_name = competitor.find("span", class_="wa-competitors__list-item-title").text
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find("span", class_="app-progress__value").text
target_spans = competitor.find_all("span", "wa-competitors__list-column")

monthly_visits = target_spans[2].text
category = target_spans[3].text
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0"))

competitor_data = {
"name": site_name,
"url": link,
"affinity": affinity,
"monthly_visits": monthly_visits,
"category": category,
"category_rank": category_rank
}

print(competitor_data)

success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")

This time, during our parse, we execute these steps:

  • Find all of the competitor rows: soup.find_all("div", class_="wa-competitors__list-item").
  • Iterate through the competitor rows.
  • For each competitor, we pull the following:
    • site_name
    • affinity
    • monthly_visits
    • category
    • category_link
    • We construct the url by once again formatting the site_name.

Step 2: Loading URLs To Scrape

We have our parsing function, but it needs a url to work. Here, we'll add another function that reads urls from the CSV and calls process_website() on each row from the file.

Here is our process_results() function.

def process_results(csv_file, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_website(row, retries=retries)

You can see how it all fits together in the code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")

rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"

rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)

average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text

search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)


def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
else:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
blocked = soup.find("div", class_="wa-limit-modal")
if blocked:
raise Exception(f"Blocked")
competitors = soup.find_all("div", class_="wa-competitors__list-item")

for competitor in competitors:
site_name = competitor.find("span", class_="wa-competitors__list-item-title").text
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find("span", class_="app-progress__value").text
target_spans = competitor.find_all("span", "wa-competitors__list-column")

monthly_visits = target_spans[2].text
category = target_spans[3].text
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0"))

competitor_data = {
"name": site_name,
"url": link,
"affinity": affinity,
"monthly_visits": monthly_visits,
"category": category,
"category_rank": category_rank
}

print(competitor_data)

success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_website(row, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []

## Job Processes
filename = "arts-and-entertainment"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, retries=MAX_RETRIES)
  • process_results() reads our CSV into an array.
  • For each row of the file, we run process_website() on the row.

Step 3: Storing the Scraped Data

Without storage, there wouldn't be a point in scraping to begin with. We've already got the DataPipeline, we just need a dataclass to feed into it. We're going to create a new one called CompetitorData. It's very much like our SearchData.

Here is our CompetitorData class.

@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In our revised code below, we open another DataPipeline inside our parsing function and we pass CompetitorData into it.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")

rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"

rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)

average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text

search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)


def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
else:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
blocked = soup.find("div", class_="wa-limit-modal")
if blocked:
raise Exception(f"Blocked")
competitors = soup.find_all("div", class_="wa-competitors__list-item")

competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for competitor in competitors:
site_name = competitor.find("span", class_="wa-competitors__list-item-title").text
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find("span", class_="app-progress__value").text
target_spans = competitor.find_all("span", "wa-competitors__list-column")

monthly_visits = target_spans[2].text
category = target_spans[3].text
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0"))

competitor_data = CompetitorData(
name=site_name,
url=link,
affinity=affinity,
monthly_visits=monthly_visits,
category=category,
category_rank=category_rank
)

competitor_pipeline.add_data(competitor_data)

competitor_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_website(row, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []

## Job Processes
filename = "arts-and-entertainment"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, retries=MAX_RETRIES)
  • CompetitorData is used to represent the competitors we extract from the page.
  • We open a new DataPipeline inside of our parsing function and pass these CompetitorData objects into the pipeline.

Step 4: Adding Concurrency

We now need to add concurrency. Instead of searching multiple categories this time, we'll need to run our parsing function on multiple rows simultaneously.

To accomplish this, we're going to refactor process_results() to take advantage of multiple threads using ThreadPoolExecutor.

Here is our multithreaded process_results().

def process_results(csv_file, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_website,
reader,
[retries] * len(reader)
)
  • process_website is the function we want to call on multiple threads.
  • reader is the array of objects that we want to process with multiple threads.
  • retries gets passed in as an array the length of reader as well.

All arguments to process_website get passed into executor.map() as arrays. These then get passed into process_website.

Here is our full code up to this point.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
rows = soup.find_all("tr", class_="top-table__row")

rank = 1
for row in rows:
link_holder = row.find("a", class_="tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"

rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find("span").get("class")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)

average_visit = row.find("span", class_="tw-table__avg-visit-duration").text
pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text)
bounce_rate = row.find("span", class_="tw-table__bounce-rate").text

search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank+=1

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)


def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
else:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
blocked = soup.find("div", class_="wa-limit-modal")
if blocked:
raise Exception(f"Blocked")
competitors = soup.find_all("div", class_="wa-competitors__list-item")

competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for competitor in competitors:
site_name = competitor.find("span", class_="wa-competitors__list-item-title").text
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find("span", class_="app-progress__value").text
target_spans = competitor.find_all("span", "wa-competitors__list-column")

monthly_visits = target_spans[2].text
category = target_spans[3].text
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0"))

competitor_data = CompetitorData(
name=site_name,
url=link,
affinity=affinity,
monthly_visits=monthly_visits,
category=category,
category_rank=category_rank
)

competitor_pipeline.add_data(competitor_data)

competitor_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_website,
reader,
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}]
aggregate_files = []

## Job Processes
filename = "arts-and-entertainment"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file