How to Scrape SimilarWeb With Selenium
SimilarWeb is an excellent resource for discovering valuable details about any website, including rank, category, rank change, average visit, pages per visit, and bounce rate. These metrics offer essential information and insights into user behavior when they visit the site. In this tutorial, we will explore how to scrape SimilarWeb with Requests and BeautifulSoup.
- TLDR: How to Scrape SimilarWeb
- How To Architect Our Scraper
- Understanding How To Scrape SimilarWeb
- Setting Up Our SimilarWeb Scraper
- Build A SimilarWeb Search Crawler
- Build A SimilarWeb Scraper
- Legal and Ethical Considerations
- Conclusion
- More Python Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
**TLDR - How to Scrape SimilarWeb
Scraping SimilarWeb can be quite challenging. To begin with, SimilarWeb restricts access after a certain point, which makes it essential to have a proxy that rotates IP addresses.
If your goal is to scrape SimilarWeb, the scraper provided below should be used.
- First, create a new project folder and include a
config.json
file. - Once the config file is created, input your ScrapeOps API key
{"api_key": "your-super-secret-api-key"}
. - Afterwards, copy and paste the code below into a new Python file.
import os
import csv
import json
import time
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
from dataclasses import dataclass, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Setup Selenium WebDriver
def setup_driver():
options = Options()
options.add_argument("--headless") # Run in headless mode for efficiency
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Function to scrape search results (fully Selenium-based)
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
# Initialize WebDriver and load page
driver = setup_driver()
scrapeops_proxy_url = get_scrapeops_url(url)
driver.get(scrapeops_proxy_url)
time.sleep(3) # Allow page to load
logger.info(f"Opened URL: {url}")
# Find all rows of the search results table
rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row")
rank = 1
for row in rows:
site_name = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare").text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
# Rank change processing
rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find_element(By.TAG_NAME, "span").get_attribute("class").split()[-1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text.strip())
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text.strip())
average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip()
pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text.strip())
bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip()
# Create data object
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank += 1
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max retries exceeded: {retries}")
# Function to process and scrape all search results concurrently
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
# Function to process websites (Selenium-based) and extract competitor data
def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
try:
driver = setup_driver()
scrapeops_proxy_url = get_scrapeops_url(url)
driver.get(scrapeops_proxy_url)
time.sleep(3) # Allow page to load
# Check if blocked by a modal or warning
try:
blocked_modal = driver.find_element(By.CSS_SELECTOR, "div.wa-limit-modal")
if blocked_modal:
raise Exception("Blocked by modal")
except:
pass # No blocking modal
# Extract competitor data
competitors = driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item")
competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}_competitors.csv")
for competitor in competitors:
site_name = competitor.find_element(By.CSS_SELECTOR, "span.wa-competitors__list-item-title").text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find_element(By.CSS_SELECTOR, "span.app-progress__value").text.strip()
target_spans = competitor.find_elements(By.CSS_SELECTOR, "span.wa-competitors__list-column")
monthly_visits = target_spans[2].text.strip()
category = target_spans[3].text.strip()
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0").strip())
competitor_data = CompetitorData(
name=site_name,
url=link,
affinity=affinity,
monthly_visits=monthly_visits,
category=category,
category_rank=category_rank
)
competitor_pipeline.add_data(competitor_data)
competitor_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {url}, Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_website,
reader,
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
# Example keywords to scrape
keyword_list = [
{"category": "arts-and-entertainment", "subcategory": "humor"},
{"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}
]
aggregate_files = []
# Crawl and save results
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
# Process each CSV file
for file in aggregate_files:
process_results(file,max_threads=MAX_THREADS, retries=MAX_RETRIES)
To adjust your results, you can modify the following:
MAX_THREADS
: Defines how many concurrent threads are used for processing and scraping tasks.MAX_RETRIES
Determines the number of retries the script will make if a request fails, such as due to a non-200 status code or network issues.keyword_list
A list of dictionaries, each containing a "category" and "subcategory," which specify the type of websites to be scraped from SimilarWeb.filename
The base name that is used to generate the CSV file where the data obtained from scraping will be saved.
How To Architect Our SimilarWeb Scraper
Scraping SimilarWeb can be difficult. Once you attempt to do anything substantial with it, the site prompts you to create an account to access its full features.
The site not only becomes difficult to scrape, but accessing it through a traditional browser is also challenging! While we are prompted to sign up for an account, there are still some actions we can take before being blocked every time.
By using ScrapeOps Proxy Aggregator, rotating proxies allow us to bypass this issue because we are being blocked based on our IP address. When each ping uses a new IP, it becomes much harder to block us.
Our SimilarWeb scraper will follow a similar structure to other scraping projects in our "How to Scrape" series. Both a crawler and a scraper will need to be built.
- The crawler will search for the top sites in a particular category, and
- The scraper will collect data from competitors and their relevant information for each of these top-ranked sites.
We’ll take an iterative approach to build the following features:
- Search for a particular site and extract its data.
- Store the extracted data in an easily manageable CSV file.
- Simultaneously search multiple categories.
- Use ScrapeOps Proxy Aggregator to bypass anti-bots and free trial limitations.
Our scraper will be developed in these steps:
- Find and extract competitors from a row in the CSV file created earlier.
- Save competitors of each site into a new CSV report.
- Run steps 1 and 2 concurrently.
- Once again, use ScrapeOps Proxy Aggregator to bypass anti-bots and free trial prompts.
Understanding How To Scrape SimilarWeb
Before diving into writing any serious code, it’s important to gain a high-level understanding of SimilarWeb.
Over the next few sections, we’ll explore how to retrieve the necessary information and how to extract it from the page.
Step 1: How To Request SimilarWeb Pages
As with most web interactions, the process starts with a basic GET request. The SimilarWeb homepage isn't particularly useful for our purposes, so we’ll target a specific endpoint.
For this example, we’ll be retrieving the top 50 humor websites.
Here’s the URL:
https://www.similarweb.com/top-websites/arts-and-entertainment/humor/
The URL follows this structure:
https://www.similarweb.com/top-websites/{CATEGORY}/{SUBCATEGORY}/
Each search requires both a category and a subcategory. In this instance, the category is "arts-and-entertainment," and the subcategory is "humor." You can see a snapshot of the page below.
When you visit the page for a particular site, the URL appears like this:
https://www.similarweb.com/website/pikabu.ru/
The format is structured as follows:
https://www.similarweb.com/website/{NAME_OF_SITE}/
Step 2: How To Extract Data From SimilarWeb Results and Pages
Extracting the data can be somewhat tricky. However, this is completely doable if we have access to the site. To begin with, some of our content loads dynamically.
When talking to ScrapeOps, we need to use the wait
parameter to load our dynamic content. Once the page is loaded, we can simply locate the information by using its CSS class.
For the results pages, each row has a class of top-table__row
. From there, we can locate all these rows and extract their data with ease.
To extract our competitors, we first target div elements that have the class wa-competitors__list-item
. These div tags contain all the information for each individual competitor.
At the top of these pages, we need to stay mindful of the modal that SimilarWeb uses to block our access. If this modal appears, we need to attempt the request again. As shown in the image below, it's a div with the class wa-limit-modal
.
Step 3: Geolocated Data
ScrapeOps allows us to manage our geolocation using the country parameter. On the other hand, with SimilarWeb, we prefer not to manage our geolocation.
Rather than managing our location, we aim to have as many IP addresses as possible to decrease the chances of being blocked or prompted to sign in/sign up, as mentioned in the previous section.
By not managing our location, we have access to a significantly larger pool of IP addresses.
Setting Up Our SimilarWeb Scraper Project
You can begin by executing the commands below to set up.
Create a New Project Folder
First, create a new project folder using the command:
mkdir similarweb-scraper
Then, move into the newly created folder by running:
cd similarweb-scraper
Create a New Virtual Environment
Next, set up a virtual environment with the command:
python -m venv venv
Activate the Environment
To activate the virtual environment, use:
source venv/bin/activate
Install Our Dependencies
Afterwards, install the required dependencies by running:
pip install selenium
pip install webdriver-manager
Build A SimilarWeb Search Crawler
It's time to begin building! In the following sections, we'll construct our crawler step by step.
We'll start with a parser and then move on to adding data storage.
After that, we'll implement concurrency and, lastly, integrate proxy support.
Step 1: Create Simple Search Data Parser
The first step in our scraping process is parsing.
In the code provided below, we set up our basic script and introduce features such as error handling and retries.
The key part is the implementation of our base parsing function. To observe how the data is extracted, focus on scrape_search_results()
.
import os
import json
import logging
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
import time
API_KEY = ""
# Load the API key from the config file
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Logging configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Function to set up the Selenium WebDriver with necessary options
def setup_driver():
options = Options()
options.add_argument("--headless") # Run in headless mode
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
# Main scraping function using Selenium
def scrape_search_results(keyword, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
# Set up and start the WebDriver
driver = setup_driver()
driver.get(url)
logger.info(f"Received page from: {url}")
# Wait for the page to load fully
time.sleep(3)
# Find all rows for the top websites table
rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row")
rank = 1
for row in rows:
link_holder = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare")
site_name = link_holder.text
link = f"https://www.similarweb.com/website/{site_name}/"
rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column.top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find_element(By.CSS_SELECTOR, "span").get_attribute("class").split(" ")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)
average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text
pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text)
bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text
# Collecting scraped data
search_data = {
"name": site_name,
"url": link,
"rank": rank,
"rank_change": rank_change,
"average_visit": average_visit,
"pages_per_visit": pages_per_visit,
"bounce_rate": bounce_rate
}
rank += 1
print("search data: ",search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries += 1
finally:
# Close the WebDriver after each attempt
driver.quit()
if not success:
raise Exception(f"Max retries exceeded for: {url}")
# Function to start the scraping process for a list of keywords
def start_scrape(keywords, retries=3):
for keyword in keywords:
scrape_search_results(keyword, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
# Input list of keywords to scrape
keyword_list = [
{"category": "arts-and-entertainment", "subcategory": "humor"},
{"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}
]
# Start scraping process
start_scrape(keyword_list, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
- First, we locate all rows using
rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row")
. - Then, we retrieve the
link_holder
withlink_holder = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare")
. - From the
link_holder
, we extract thesite_name
and construct our link. - We determine whether the rank has increased or decreased by using
rank_change_holder.find_element(By.CSS_SELECTOR, "span").get_attribute("class").split(" ")[1]
. - The average visit duration is obtained through
row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text
. - The number of
pages_per_visit
is retrieved withfloat(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text)
. - Lastly, the bounce rate is collected using
row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text
.
Step 2: Storing the Scraped Data
After we have our data, we need to store it. In order to store it, we will create a few classes. A dataclass called SearchData
is required. This class will represent individual objects from the search results.
Once the SearchData
object is created, it needs to be passed into a DataPipeline. The DataPipeline is responsible for opening a pipe to a CSV file. It removes duplicates by name and then saves all the non-duplicate objects to the CSV file.
Below is our SearchData
class, which we use to represent individual ranking results.
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name).strip()
setattr(self, field.name, value)
Here is our DataPipeline
.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
- We need to open a new DataPipeline and pass it into
start_scrape()
when we put everything together. start_scrape()
then sends the pipeline to our parsing function.- Rather than printing the parsed data, we now send it into the pipeline.
- After parsing the results, we close the DataPipeline.
import os
import csv
import json
import logging
import time
from dataclasses import dataclass, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
API_KEY = ""
# Load API key from config
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Logging configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Dataclass representing individual search results
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name).strip()
setattr(self, field.name, value)
# Class for handling data storage to CSV
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Function to set up Selenium WebDriver
def setup_driver():
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
# Function to scrape search results using Selenium
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
# Setup and start Selenium WebDriver
driver = setup_driver()
driver.get(url)
logger.info(f"Received page from: {url}")
time.sleep(3) # Wait for the page to load
# Find rows in the search results
rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row")
rank = 1
for row in rows:
link_holder = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare")
site_name = link_holder.text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column.top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find_element(By.CSS_SELECTOR, "span").get_attribute("class").split(" ")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)
average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip()
pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text)
bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip()
# Create a SearchData object
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
# Add data to the pipeline
data_pipeline.add_data(search_data)
rank += 1
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries - tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max retries exceeded for: {url}")
# Function to start the scraping process for a list of keywords
def start_scrape(keywords, data_pipeline=None, retries=3):
for keyword in keywords:
scrape_search_results(keyword, data_pipeline=data_pipeline, retries=retries)
# Main execution
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
# Input list of keywords to scrape
keyword_list = [
{"category": "arts-and-entertainment", "subcategory": "humor"},
{"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}
]
# Initialize DataPipeline
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
# Start the scraping process
start_scrape(keyword_list, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
# Close the pipeline after scraping
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
In our code, each item in the results is represented as SearchData
. These SearchData objects are then passed into our DataPipeline and stored in a CSV file.
Step 3: Adding Concurrency
Now, we need to incorporate concurrency. To add multithreading support, we’ll utilize ThreadPoolExecutor
.
Once we have the ability to open several threads, we can employ those threads to run our parsing function on multiple pages simultaneously.
Below is our start_scrape()
function modified for concurrency.
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
We want to call the function scrape_search_results
by utilizing multiple threads. The array keywords
contains the items we wish to search for. All additional arguments to scrape_search_results
are passed in as arrays.
import os
import csv
import json
import logging
import time
from dataclasses import dataclass, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
API_KEY = ""
# Load API key from config
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Logging configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Dataclass representing individual search results
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name).strip()
setattr(self, field.name, value)
# Class for handling data storage to CSV
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Function to set up Selenium WebDriver
def setup_driver():
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
# Function to scrape search results using Selenium
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
# Setup and start Selenium WebDriver
driver = setup_driver()
driver.get(url)
logger.info(f"Received page from: {url}")
time.sleep(3) # Wait for the page to load
# Find rows in the search results
rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row")
rank = 1
for row in rows:
link_holder = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare")
site_name = link_holder.text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column.top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find_element(By.CSS_SELECTOR, "span").get_attribute("class").split(" ")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)
average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip()
pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text)
bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip()
# Create a SearchData object
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
# Add data to the pipeline
data_pipeline.add_data(search_data)
rank += 1
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries - tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max retries exceeded for: {url}")
# Function to start the scraping process for a list of keywords
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
# Main execution
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
# Input list of keywords to scrape
keyword_list = [
{"category": "arts-and-entertainment", "subcategory": "humor"},
{"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}
]
# Initialize DataPipeline
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
# Start the scraping process
start_scrape(keyword_list, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES)
# Close the pipeline after scraping
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
We are now capable of crawling several categories concurrently.
Step 4: Bypassing Anti-Bots
To scrape SimilarWeb effectively, a large number of IP addresses are required. By using only three parameters — API_KEY
, url
, and wait
— we can obtain as many addresses as possible.
This tells ScrapeOps that we’re willing to wait 3 seconds for the content to load, without concern for the country through which we’re routed.
This approach provides us with the largest possible pool of IP addresses since routing can happen through any server that ScrapeOps supports.
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
The code below holds our crawler that is ready for production.
import os
import csv
import json
import logging
import time
from urllib.parse import urlencode
from dataclasses import dataclass, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
API_KEY = ""
# Load API key from config
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Logging configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Dataclass representing individual search results
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name).strip()
setattr(self, field.name, value)
# Class for handling data storage to CSV
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Function to set up Selenium WebDriver
def setup_driver():
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
# Function to scrape search results using Selenium
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
# Setup and start Selenium WebDriver
driver = setup_driver()
scrapeops_proxy_url = get_scrapeops_url(url)
driver.get(scrapeops_proxy_url)
logger.info(f"Received page from: {url}")
time.sleep(3) # Wait for the page to load
# Find rows in the search results
rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row")
rank = 1
for row in rows:
link_holder = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare")
site_name = link_holder.text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column.top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find_element(By.CSS_SELECTOR, "span").get_attribute("class").split(" ")[1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text)
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text)
average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip()
pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text)
bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip()
# Create a SearchData object
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
# Add data to the pipeline
data_pipeline.add_data(search_data)
rank += 1
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries - tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max retries exceeded for: {url}")
# Function to start the scraping process for a list of keywords
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
# Main execution
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
# Input list of keywords to scrape
keyword_list = [
{"category": "arts-and-entertainment", "subcategory": "humor"},
{"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}
]
# Initialize DataPipeline
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
# Start the scraping process
start_scrape(keyword_list, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES)
# Close the pipeline after scraping
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
Step 5: Production Run
Alright! Time to run this code in production.
As you've noticed, our MAX_THREADS
are set to 5. Since we're only searching 2 categories, ThreadPoolExecutor
will use 2 threads to run this and finish it.
In the second half of our article, we'll make use of all 5 threads when writing the scraper.
Here is our main:
# Main execution
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
# Input list of keywords to scrape
keyword_list = [
{"category": "arts-and-entertainment", "subcategory": "humor"},
{"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}
]
# Initialize DataPipeline
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
# Start the scraping process
start_scrape(keyword_list, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES)
# Close the pipeline after scraping
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
The results from our crawl are inconsistent.
In one instance, it took 36.82 seconds, while in another, it took 52.59 seconds.
This demonstrates that when SimilarWeb starts blocking us, ScrapeOps searches for new servers to ensure every request is completed.
Build A SimilarWeb Scraper
Now that the results are being saved after running a proper crawl, we need to utilize those results. In this section, we'll scrape the competitors for each site that was extracted during the crawl.
The scraper should do the following:
- Load the CSV into an array.
- Extract the websites from the array.
- Save the competitor data after extracting it.
- Perform steps 2 and 3 simultaneously for quicker results.
- Work with the ScrapeOps Proxy Aggregator to bypass anti-bots and other obstacles.
Step 1: Create Simple Website Data Parser
Just like before, we'll begin with a parsing function. This one will locate all of the competitor objects on the page and pull out their data.
def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
try:
driver = setup_driver()
driver.get(url)
time.sleep(3) # Allow page to load
# Check if blocked by a modal or warning
try:
blocked_modal = driver.find_element(By.CSS_SELECTOR, "div.wa-limit-modal")
if blocked_modal:
raise Exception("Blocked by modal")
except:
pass # No blocking modal
# Extract competitor data
competitors = driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item")
for competitor in competitors:
site_name = competitor.find_element(By.CSS_SELECTOR, "span.wa-competitors__list-item-title").text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find_element(By.CSS_SELECTOR, "span.app-progress__value").text.strip()
target_spans = competitor.find_elements(By.CSS_SELECTOR, "span.wa-competitors__list-column")
monthly_visits = target_spans[2].text.strip()
category = target_spans[3].text.strip()
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0").strip())
competitor_data = {
"name": site_name,
"url": link,
"affinity": affinity,
"monthly_visits": monthly_visits,
"category": category,
"category_rank": category_rank
}
print(competitor_data) # Replace with actual storage mechanism
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {url}, Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
This time, during our parse, we execute these steps:
- Find all of the competitor rows:
driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item")
- Iterate through the competitor rows.
- For each competitor, we pull the following:
site_name
affinity
monthly_visits
category
category_link
- We construct the url by once again formatting the
site_name
.
Step 2: Loading URLs To Scrape
We have our parsing function, but it requires a URL to operate.
In this case, we'll include another function that retrieves URLs from the CSV file and applies process_website()
to every row in the file.
Below is our process_results()
function.
def process_results(csv_file, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_website(row, retries=retries)
Check out the complete code given below:
import os
import csv
import json
import time
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
# ScrapeOps API Key (if you're using a proxy service like ScrapeOps)
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Setup Selenium WebDriver
def setup_driver():
options = Options()
options.add_argument("--headless") # Run in headless mode for efficiency
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Function to scrape search results (fully Selenium-based)
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
# Initialize WebDriver and load page
driver = setup_driver()
scrapeops_proxy_url = get_scrapeops_url(url)
driver.get(scrapeops_proxy_url)
time.sleep(3) # Allow page to load
logger.info(f"Opened URL: {url}")
# Find all rows of the search results table
rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row")
rank = 1
for row in rows:
site_name = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare").text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
# Rank change processing
rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find_element(By.TAG_NAME, "span").get_attribute("class").split()[-1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text.strip())
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text.strip())
average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip()
pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text.strip())
bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip()
# Create data object
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank += 1
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max retries exceeded: {retries}")
# Function to process and scrape all search results concurrently
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
# Function to process websites (Selenium-based)
def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
try:
driver = setup_driver()
driver.get(url)
time.sleep(3) # Allow page to load
# Check if blocked by a modal or warning
try:
blocked_modal = driver.find_element(By.CSS_SELECTOR, "div.wa-limit-modal")
if blocked_modal:
raise Exception("Blocked by modal")
except:
pass # No blocking modal
# Extract competitor data
competitors = driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item")
for competitor in competitors:
site_name = competitor.find_element(By.CSS_SELECTOR, "span.wa-competitors__list-item-title").text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find_element(By.CSS_SELECTOR, "span.app-progress__value").text.strip()
target_spans = competitor.find_elements(By.CSS_SELECTOR, "span.wa-competitors__list-column")
monthly_visits = target_spans[2].text.strip()
category = target_spans[3].text.strip()
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0").strip())
competitor_data = {
"name": site_name,
"url": link,
"affinity": affinity,
"monthly_visits": monthly_visits,
"category": category,
"category_rank": category_rank
}
print(competitor_data) # Replace with actual storage mechanism
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {url}, Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
# Function to load and process CSV results
def process_results(csv_file, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_website(row, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
# Example keywords to scrape
keyword_list = [
{"category": "arts-and-entertainment", "subcategory": "humor"},
{"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}
]
aggregate_files = []
# Crawl and save results
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
# Process each CSV file
for file in aggregate_files:
process_results(file, retries=MAX_RETRIES)
process_results()
loads our CSV into an array. We apply process_website()
to each row of the file.
Step 3: Storing the Scraped Data
DataScraping would be pointless if we didn't store the data. Since we already have the DataPipeline, we just need a dataclass to input into it.
We'll create a new one called CompetitorData
, which is quite similar to our SearchData
. Below is our CompetitorData
class.
@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In the revised code below, inside our parsing function, we open another DataPipeline
and pass CompetitorData
into it.
import os
import csv
import json
import time
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
from dataclasses import dataclass, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Setup Selenium WebDriver
def setup_driver():
options = Options()
options.add_argument("--headless") # Run in headless mode for efficiency
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Function to scrape search results (fully Selenium-based)
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
# Initialize WebDriver and load page
driver = setup_driver()
scrapeops_proxy_url = get_scrapeops_url(url)
driver.get(scrapeops_proxy_url)
time.sleep(3) # Allow page to load
logger.info(f"Opened URL: {url}")
# Find all rows of the search results table
rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row")
rank = 1
for row in rows:
site_name = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare").text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
# Rank change processing
rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find_element(By.TAG_NAME, "span").get_attribute("class").split()[-1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text.strip())
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text.strip())
average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip()
pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text.strip())
bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip()
# Create data object
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank += 1
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max retries exceeded: {retries}")
# Function to process and scrape all search results concurrently
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
# Function to process websites (Selenium-based) and extract competitor data
def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
try:
driver = setup_driver()
driver.get(url)
time.sleep(3) # Allow page to load
# Check if blocked by a modal or warning
try:
blocked_modal = driver.find_element(By.CSS_SELECTOR, "div.wa-limit-modal")
if blocked_modal:
raise Exception("Blocked by modal")
except:
pass # No blocking modal
# Extract competitor data
competitors = driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item")
competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}_competitors.csv")
for competitor in competitors:
site_name = competitor.find_element(By.CSS_SELECTOR, "span.wa-competitors__list-item-title").text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find_element(By.CSS_SELECTOR, "span.app-progress__value").text.strip()
target_spans = competitor.find_elements(By.CSS_SELECTOR, "span.wa-competitors__list-column")
monthly_visits = target_spans[2].text.strip()
category = target_spans[3].text.strip()
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0").strip())
competitor_data = CompetitorData(
name=site_name,
url=link,
affinity=affinity,
monthly_visits=monthly_visits,
category=category,
category_rank=category_rank
)
competitor_pipeline.add_data(competitor_data)
competitor_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {url}, Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
# Function to load and process CSV results
def process_results(csv_file, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_website(row, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
# Example keywords to scrape
keyword_list = [
{"category": "arts-and-entertainment", "subcategory": "humor"},
{"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}
]
aggregate_files = []
# Crawl and save results
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
# Process each CSV file
for file in aggregate_files:
process_results(file, retries=MAX_RETRIES)
CompetitorData
is used to represent the competitors we extract from the page.
Inside of our parsing function, we open a new DataPipeline and pass these CompetitorData
objects into it.
Step 4: Adding Concurrency
We now need to add concurrency. This time, instead of searching multiple categories, we'll need to run our parsing function on multiple rows at the same time.
To achieve this, we're going to refactor process_results()
to take advantage of multiple threads, using ThreadPoolExecutor
.
Below is our multithreaded version of process_results()
.
def process_results(csv_file, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_website,
reader,
[retries] * len(reader)
)
We want to call the process_website
function across multiple threads. The reader
is an array of objects that we aim to process using several threads. The retries
are also passed as an array, matching the length of the reader
array.
All the arguments passed to process_website
are given into executor.map()
in array form, which are then forwarded into process_website
.
Below is the full code we've written so far.
import os
import csv
import json
import time
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
from dataclasses import dataclass, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Setup Selenium WebDriver
def setup_driver():
options = Options()
options.add_argument("--headless") # Run in headless mode for efficiency
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Function to scrape search results (fully Selenium-based)
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
# Initialize WebDriver and load page
driver = setup_driver()
scrapeops_proxy_url = get_scrapeops_url(url)
driver.get(scrapeops_proxy_url)
time.sleep(3) # Allow page to load
logger.info(f"Opened URL: {url}")
# Find all rows of the search results table
rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row")
rank = 1
for row in rows:
site_name = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare").text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
# Rank change processing
rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find_element(By.TAG_NAME, "span").get_attribute("class").split()[-1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text.strip())
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text.strip())
average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip()
pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text.strip())
bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip()
# Create data object
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank += 1
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max retries exceeded: {retries}")
# Function to process and scrape all search results concurrently
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
# Function to process websites (Selenium-based) and extract competitor data
def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
try:
driver = setup_driver()
driver.get(url)
time.sleep(3) # Allow page to load
# Check if blocked by a modal or warning
try:
blocked_modal = driver.find_element(By.CSS_SELECTOR, "div.wa-limit-modal")
if blocked_modal:
raise Exception("Blocked by modal")
except:
pass # No blocking modal
# Extract competitor data
competitors = driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item")
competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}_competitors.csv")
for competitor in competitors:
site_name = competitor.find_element(By.CSS_SELECTOR, "span.wa-competitors__list-item-title").text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find_element(By.CSS_SELECTOR, "span.app-progress__value").text.strip()
target_spans = competitor.find_elements(By.CSS_SELECTOR, "span.wa-competitors__list-column")
monthly_visits = target_spans[2].text.strip()
category = target_spans[3].text.strip()
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0").strip())
competitor_data = CompetitorData(
name=site_name,
url=link,
affinity=affinity,
monthly_visits=monthly_visits,
category=category,
category_rank=category_rank
)
competitor_pipeline.add_data(competitor_data)
competitor_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {url}, Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_website,
reader,
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
# Example keywords to scrape
keyword_list = [
{"category": "arts-and-entertainment", "subcategory": "humor"},
{"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}
]
aggregate_files = []
# Crawl and save results
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
# Process each CSV file
for file in aggregate_files:
process_results(file,max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 5: Bypassing Anti-Bots
When viewing reports for individual sites, we often get blocked. You can see an example of this in the image below. To bypass this, we will use the proxy function that was written earlier.
We only need to change two lines of our parsing function just to use proxy URL to implement this.
proxy_url = get_scrapeops_url(url)
driver.get(proxy_url)
Here is the complete code.
import os
import csv
import json
import time
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
from dataclasses import dataclass, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url):
payload = {
"api_key": API_KEY,
"url": url,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Setup Selenium WebDriver
def setup_driver():
options = Options()
options.add_argument("--headless") # Run in headless mode for efficiency
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
rank_change: int = 0
average_visit: str = ""
pages_per_visit: float = 0.0
bounce_rate: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class CompetitorData:
name: str = ""
url: str = ""
affinity: str = ""
monthly_visits: str = ""
category: str = ""
category_rank: int = None
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Function to scrape search results (fully Selenium-based)
def scrape_search_results(keyword, data_pipeline=None, retries=3):
url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/"
tries = 0
success = False
while tries <= retries and not success:
try:
# Initialize WebDriver and load page
driver = setup_driver()
scrapeops_proxy_url = get_scrapeops_url(url)
driver.get(scrapeops_proxy_url)
time.sleep(3) # Allow page to load
logger.info(f"Opened URL: {url}")
# Find all rows of the search results table
rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row")
rank = 1
for row in rows:
site_name = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare").text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
# Rank change processing
rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column--rank-change")
rank_change = 0
up_or_down = rank_change_holder.find_element(By.TAG_NAME, "span").get_attribute("class").split()[-1]
if "change--up" in up_or_down:
rank_change += int(rank_change_holder.text.strip())
elif "change--down" in up_or_down:
rank_change -= int(rank_change_holder.text.strip())
average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip()
pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text.strip())
bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip()
# Create data object
search_data = SearchData(
name=site_name,
url=link,
rank=rank,
rank_change=rank_change,
average_visit=average_visit,
pages_per_visit=pages_per_visit,
bounce_rate=bounce_rate
)
rank += 1
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max retries exceeded: {retries}")
# Function to process and scrape all search results concurrently
def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
# Function to process websites (Selenium-based) and extract competitor data
def process_website(row, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
try:
driver = setup_driver()
scrapeops_proxy_url = get_scrapeops_url(url)
driver.get(scrapeops_proxy_url)
time.sleep(3) # Allow page to load
# Check if blocked by a modal or warning
try:
blocked_modal = driver.find_element(By.CSS_SELECTOR, "div.wa-limit-modal")
if blocked_modal:
raise Exception("Blocked by modal")
except:
pass # No blocking modal
# Extract competitor data
competitors = driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item")
competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}_competitors.csv")
for competitor in competitors:
site_name = competitor.find_element(By.CSS_SELECTOR, "span.wa-competitors__list-item-title").text.strip()
link = f"https://www.similarweb.com/website/{site_name}/"
affinity = competitor.find_element(By.CSS_SELECTOR, "span.app-progress__value").text.strip()
target_spans = competitor.find_elements(By.CSS_SELECTOR, "span.wa-competitors__list-column")
monthly_visits = target_spans[2].text.strip()
category = target_spans[3].text.strip()
category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0").strip())
competitor_data = CompetitorData(
name=site_name,
url=link,
affinity=affinity,
monthly_visits=monthly_visits,
category=category,
category_rank=category_rank
)
competitor_pipeline.add_data(competitor_data)
competitor_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {url}, Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_website,
reader,
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
# Example keywords to scrape
keyword_list = [
{"category": "arts-and-entertainment", "subcategory": "humor"},
{"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}
]
aggregate_files = []
# Crawl and save results
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
# Process each CSV file
for file in aggregate_files:
process_results(file,max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Time to run this entire thing in production! We're going to use the same settings as before. Here is our main
if you need a refresher.
Since there was such a spread in our crawl times, we'll estimate the crawl at 30 seconds.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
logger.info(f"Crawl starting...")
# Example keywords to scrape
keyword_list = [
{"category": "arts-and-entertainment", "subcategory": "humor"},
{"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}
]
aggregate_files = []
# Crawl and save results
filename = "arts-and-entertainment"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
# Process each CSV file
for file in aggregate_files:
process_results(file,max_threads=MAX_THREADS, retries=MAX_RETRIES)
The crawl generated a CSV file with 100 results and took 1073.349 seconds. As mentioned earlier, we'll estimate our crawl at 30 seconds. 1073.349 - 30 = 1,043.349 seconds scraping competitors.
1,043.349 seconds / 100 results = 10.433 seconds per result.
Especially considering the 3 second wait time for content to render, this is pretty good!.
Legal and Ethical Considerations
Web scraping comes with important legal and ethical responsibilities. Scraping public data—information that is not gated behind a login—is typically considered legal, much like taking a photograph of a billboard.
However, scraping private data introduces entirely different challenges. Accessing data behind a login or other restrictions may violate intellectual property laws and privacy regulations.
Even when dealing with public data, it's essential to respect the Terms and Conditions of the target website and adhere to its robots.txt
file. Ignoring these rules could result in account suspension or permanent bans.
You can view these for SimilarWeb by checking the links below.
If you're unsure of your scraper, you should talk to an attorney.
Conclusion
In conclusion, SimilarWeb can be a very difficult site to access without an account let alone scrape. The ScrapeOps Proxy Aggregator gives us the ability to constantly rotate to new IP addresses and as SimilarWeb blocks them.
You got a crash course in iterative development and you should understand parsing, data storage, concurrency and proxy integration.
If you'd like to learn more about the tech stack used in this site, take a look at the links below.
More Python Web Scraping Guides
At ScrapeOps, we provide a wealth of educational material for developers at all skill levels.
Whether you're just learning how to code or if you've been writing software for years, we've got something for you. We even wrote the Selenium Web Scraping Playbook.
If you want to read more from our "How To Scrape" series, check out the articles below.