rank
, category
, rank_change
, average_vist
, pages_per_visit
, and bounce_rate
. Each of these metrics can provide critical data and insight into what users are doing when they access the site.
Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file.{"api_key": "your-super-secret-api-key"}
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CompetitorData: name: str = "" url: str = "" affinity: str = "" monthly_visits: str = "" category: str = "" category_rank: int = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") rows = soup.find_all("tr", class_="top-table__row") rank = 1 for row in rows: link_holder = row.find("a", class_="tw-table__compare") site_name = link_holder.text link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find("span").get("class")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find("span", class_="tw-table__avg-visit-duration").text pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text) bounce_rate = row.find("span", class_="tw-table__bounce-rate").text search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank+=1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_website(row, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") else: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") blocked = soup.find("div", class_="wa-limit-modal") if blocked: raise Exception(f"Blocked") competitors = soup.find_all("div", class_="wa-competitors__list-item") competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for competitor in competitors: site_name = competitor.find("span", class_="wa-competitors__list-item-title").text link = f"https://www.similarweb.com/website/{site_name}/" affinity = competitor.find("span", class_="app-progress__value").text target_spans = competitor.find_all("span", "wa-competitors__list-column") monthly_visits = target_spans[2].text category = target_spans[3].text category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0")) competitor_data = CompetitorData( name=site_name, url=link, affinity=affinity, monthly_visits=monthly_visits, category=category, category_rank=category_rank ) competitor_pipeline.add_data(competitor_data) competitor_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_website(row, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}] aggregate_files = [] ## Job Processes filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, retries=MAX_RETRIES)
MAX_THREADS
: Defines the number of concurrent threads used during the scraping and processing tasks.MAX_RETRIES
: Determines the maximum number of retries the script will attempt if a request fails (e.g., due to a network issue or a non-200 status code).keyword_list
: A list of dictionaries where each dictionary contains a "category" and "subcategory" that specify the type of websites to scrape from SimilarWeb.filename
: The base name used to create the CSV file where the scraped data will be saved.https://www.similarweb.com/top-websites/arts-and-entertainment/humor/
https://www.similarweb.com/top-websites/{CATEGORY}/{SUBCATEGORY}/
category
and a subcategory
. In this case, our category
is "arts-and-entertainment"
while our subcategory is "humor"
.You can view a shot of the page below.https://www.similarweb.com/website/pikabu.ru/
https://www.similarweb.com/website/{NAME_OF_SITE}/
wait
parameter when talking to ScrapeOps. After we have our loaded page, we just need to find the information using its CSS class.For the results pages, each row has a class of top-table__row
. We can find all these rows and easily extract their data from there.div
elements with the class
of wa-competitors__list-item
. Each of these div
tags holds all the data for each competitor.div
with a class
of wa-limit-modal
.country
parameter. However, with SimilarWeb we don't want to control our geolocation.Instead of controlling our location, we want as many IP addresses as possible to reduce our likelihood of getting blocked and asked to sign in/ sign up like you saw in the previous section.By not controlling our location, this gives us a much larger pool of IP addresses to use.mkdir similarweb-scraper cd similarweb-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
scrape_search_results()
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") rows = soup.find_all("tr", class_="top-table__row") rank = 1 for row in rows: link_holder = row.find("a", class_="tw-table__compare") site_name = link_holder.text link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find("span").get("class")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find("span", class_="tw-table__avg-visit-duration").text pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text) bounce_rate = row.find("span", class_="tw-table__bounce-rate").text search_data = { "name": site_name, "url": link, "rank": rank, "rank_change": rank_change, "average_visit": average_visit, "pages_per_visit": pages_per_visit, "bounce_rate": bounce_rate } rank+=1 print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, retries=3): for keyword in keywords: scrape_search_results(keyword, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}] aggregate_files = [] ## Job Processes filename = "arts-and-entertainment" start_scrape(keyword_list, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
rows = soup.find_all("tr", class_="top-table__row")
.link_holder
with row.find("a", class_="tw-table__compare")
.link_holder
object, we extract our site_name
and construct our link
.rank_change_holder.find("span").get("class")[1]
is used to find whether the rank went up or down.row.find("span", class_="tw-table__avg-visit-duration").text
.float(row.find("span", class_="tw-table__pages-per-visit").text)
finds our pages_per_visit
.bounce_rate
with row.find("span", class_="tw-table__bounce-rate").text
.dataclass
, SearchData
.SearchData
will be used to represent individual objects from our search results. Once we have a SearchData
object, we need to pass it into a DataPipeline
.Our DataPipeline
is used to open a pipe to a CSV file. The pipeline filters out duplicates by name
and then saves all non-duplicate objects to a CSV file.Here is our SearchData
class. We use this to represent individual ranking results.@dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
DataPipeline
and pass it into start_scrape()
. start_scrape()
then passes the pipeline into our parsing function.Instead of printing our parsed data, we now pass that into the pipeline. Once we're finished parsing the results, we go ahead and close the DataPipeline
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") rows = soup.find_all("tr", class_="top-table__row") rank = 1 for row in rows: link_holder = row.find("a", class_="tw-table__compare") site_name = link_holder.text link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find("span").get("class")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find("span", class_="tw-table__avg-visit-duration").text pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text) bounce_rate = row.find("span", class_="tw-table__bounce-rate").text search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank+=1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, data_pipeline=None, retries=3): for keyword in keywords: scrape_search_results(keyword, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}] aggregate_files = [] ## Job Processes filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData
.SearchData
objects then get passed into our DataPipeline
and saved to a CSV file.ThreadPoolExecutor
to add support for multithreading. Once we can open multiple threads, we can use those threads to run our parsing function on multiple pages concurrently.Here is our start_scrape()
function adjusted for concurrency.def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) )
scrape_search_results
is the function we'd like to call using multiple threads.keywords
is the array of things we'd like to search.scrape_search_results
get passed in as arrays.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") rows = soup.find_all("tr", class_="top-table__row") rank = 1 for row in rows: link_holder = row.find("a", class_="tw-table__compare") site_name = link_holder.text link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find("span").get("class")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find("span", class_="tw-table__avg-visit-duration").text pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text) bounce_rate = row.find("span", class_="tw-table__bounce-rate").text search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank+=1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}] aggregate_files = [] ## Job Processes filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
API_KEY
, url
and wait
. This tells ScrapeOps that we want to wait
3 seconds for content to render and we don't care which country we're routed through.This gives us the largest pool of potential IP addresses because we can be routed through any server that ScrapeOps supports.def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") rows = soup.find_all("tr", class_="top-table__row") rank = 1 for row in rows: link_holder = row.find("a", class_="tw-table__compare") site_name = link_holder.text link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find("span").get("class")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find("span", class_="tw-table__avg-visit-duration").text pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text) bounce_rate = row.find("span", class_="tw-table__bounce-rate").text search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank+=1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}] aggregate_files = [] ## Job Processes filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
MAX_THREADS
set to 5. We're only searching 2 categories, so ThreadPoolExecutor
will run this on 2 threads and finish it out.In the next half of our article, when we write the scraper, we'll take advantage of all 5 threads.Here is our main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}] aggregate_files = [] ## Job Processes filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def process_website(row, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") else: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") blocked = soup.find("div", class_="wa-limit-modal") if blocked: raise Exception(f"Blocked") competitors = soup.find_all("div", class_="wa-competitors__list-item") for competitor in competitors: site_name = competitor.find("span", class_="wa-competitors__list-item-title").text link = f"https://www.similarweb.com/website/{site_name}/" affinity = competitor.find("span", class_="app-progress__value").text target_spans = competitor.find_all("span", "wa-competitors__list-column") monthly_visits = target_spans[2].text category = target_spans[3].text category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0")) competitor_data = { "name": site_name, "url": link, "affinity": affinity, "monthly_visits": monthly_visits, "category": category, "category_rank": category_rank } print(competitor_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
soup.find_all("div", class_="wa-competitors__list-item")
.site_name
affinity
monthly_visits
category
category_link
site_name
.process_website()
on each row from the file.Here is our process_results()
function.def process_results(csv_file, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_website(row, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") rows = soup.find_all("tr", class_="top-table__row") rank = 1 for row in rows: link_holder = row.find("a", class_="tw-table__compare") site_name = link_holder.text link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find("span").get("class")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find("span", class_="tw-table__avg-visit-duration").text pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text) bounce_rate = row.find("span", class_="tw-table__bounce-rate").text search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank+=1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_website(row, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") else: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") blocked = soup.find("div", class_="wa-limit-modal") if blocked: raise Exception(f"Blocked") competitors = soup.find_all("div", class_="wa-competitors__list-item") for competitor in competitors: site_name = competitor.find("span", class_="wa-competitors__list-item-title").text link = f"https://www.similarweb.com/website/{site_name}/" affinity = competitor.find("span", class_="app-progress__value").text target_spans = competitor.find_all("span", "wa-competitors__list-column") monthly_visits = target_spans[2].text category = target_spans[3].text category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0")) competitor_data = { "name": site_name, "url": link, "affinity": affinity, "monthly_visits": monthly_visits, "category": category, "category_rank": category_rank } print(competitor_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_website(row, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}] aggregate_files = [] ## Job Processes filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, retries=MAX_RETRIES)
process_results()
reads our CSV into an array.process_website()
on the row.DataPipeline
, we just need a dataclass
to feed into it. We're going to create a new one called CompetitorData
. It's very much like our SearchData
.Here is our CompetitorData
class.@dataclassclass CompetitorData: name: str = "" url: str = "" affinity: str = "" monthly_visits: str = "" category: str = "" category_rank: int = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
inside our parsing function and we pass CompetitorData
into it.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CompetitorData: name: str = "" url: str = "" affinity: str = "" monthly_visits: str = "" category: str = "" category_rank: int = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") rows = soup.find_all("tr", class_="top-table__row") rank = 1 for row in rows: link_holder = row.find("a", class_="tw-table__compare") site_name = link_holder.text link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find("span").get("class")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find("span", class_="tw-table__avg-visit-duration").text pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text) bounce_rate = row.find("span", class_="tw-table__bounce-rate").text search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank+=1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_website(row, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") else: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") blocked = soup.find("div", class_="wa-limit-modal") if blocked: raise Exception(f"Blocked") competitors = soup.find_all("div", class_="wa-competitors__list-item") competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for competitor in competitors: site_name = competitor.find("span", class_="wa-competitors__list-item-title").text link = f"https://www.similarweb.com/website/{site_name}/" affinity = competitor.find("span", class_="app-progress__value").text target_spans = competitor.find_all("span", "wa-competitors__list-column") monthly_visits = target_spans[2].text category = target_spans[3].text category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0")) competitor_data = CompetitorData( name=site_name, url=link, affinity=affinity, monthly_visits=monthly_visits, category=category, category_rank=category_rank ) competitor_pipeline.add_data(competitor_data) competitor_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_website(row, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}] aggregate_files = [] ## Job Processes filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, retries=MAX_RETRIES)
CompetitorData
is used to represent the competitors we extract from the page.DataPipeline
inside of our parsing function and pass these CompetitorData
objects into the pipeline.process_results()
to take advantage of multiple threads using ThreadPoolExecutor
.Here is our multithreaded process_results()
.def process_results(csv_file, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_website, reader, [retries] * len(reader) )
process_website
is the function we want to call on multiple threads.reader
is the array of objects that we want to process with multiple threads.retries
gets passed in as an array the length of reader
as well.process_website
get passed into executor.map()
as arrays. These then get passed into process_website
.Here is our full code up to this point.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CompetitorData: name: str = "" url: str = "" affinity: str = "" monthly_visits: str = "" category: str = "" category_rank: int = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") rows = soup.find_all("tr", class_="top-table__row") rank = 1 for row in rows: link_holder = row.find("a", class_="tw-table__compare") site_name = link_holder.text link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find("span").get("class")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find("span", class_="tw-table__avg-visit-duration").text pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text) bounce_rate = row.find("span", class_="tw-table__bounce-rate").text search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank+=1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_website(row, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") else: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") blocked = soup.find("div", class_="wa-limit-modal") if blocked: raise Exception(f"Blocked") competitors = soup.find_all("div", class_="wa-competitors__list-item") competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for competitor in competitors: site_name = competitor.find("span", class_="wa-competitors__list-item-title").text link = f"https://www.similarweb.com/website/{site_name}/" affinity = competitor.find("span", class_="app-progress__value").text target_spans = competitor.find_all("span", "wa-competitors__list-column") monthly_visits = target_spans[2].text category = target_spans[3].text category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0")) competitor_data = CompetitorData( name=site_name, url=link, affinity=affinity, monthly_visits=monthly_visits, category=category, category_rank=category_rank ) competitor_pipeline.add_data(competitor_data) competitor_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_website, reader, [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}] aggregate_files = [] ## Job Processes filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
response = requests.get(get_scrapeops_url(url))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"]© def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CompetitorData: name: str = "" url: str = "" affinity: str = "" monthly_visits: str = "" category: str = "" category_rank: int = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") rows = soup.find_all("tr", class_="top-table__row") rank = 1 for row in rows: link_holder = row.find("a", class_="tw-table__compare") site_name = link_holder.text link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find("td", class_="top-table__column top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find("span").get("class")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find("span", class_="tw-table__avg-visit-duration").text pages_per_visit = float(row.find("span", class_="tw-table__pages-per-visit").text) bounce_rate = row.find("span", class_="tw-table__bounce-rate").text search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank+=1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_website(row, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url)) try: if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") else: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") blocked = soup.find("div", class_="wa-limit-modal") if blocked: raise Exception(f"Blocked") competitors = soup.find_all("div", class_="wa-competitors__list-item") competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for competitor in competitors: site_name = competitor.find("span", class_="wa-competitors__list-item-title").text link = f"https://www.similarweb.com/website/{site_name}/" affinity = competitor.find("span", class_="app-progress__value").text target_spans = competitor.find_all("span", "wa-competitors__list-column") monthly_visits = target_spans[2].text category = target_spans[3].text category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0")) competitor_data = CompetitorData( name=site_name, url=link, affinity=affinity, monthly_visits=monthly_visits, category=category, category_rank=category_rank ) competitor_pipeline.add_data(competitor_data) competitor_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_website, reader, [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}] aggregate_files = [] ## Job Processes filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
if you need a refresher.Since there was such a spread in our crawl times, we'll estimate the crawl at 30 seconds.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"}] aggregate_files = [] ## Job Processes filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
file as well. Violating these could lead to suspension of your account or even a permanent ban.You can view these for SimilarWeb by checking the links below.If you're unsure of your scraper, you should talk to an attorney.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file.{"api_key": "your-super-secret-api-key"}
.import os import csv import json import time import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures from dataclasses import dataclass, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Setup Selenium WebDriver def setup_driver(): options = Options() options.add_argument("--headless") # Run in headless mode for efficiency options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) @dataclass class SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class CompetitorData: name: str = "" url: str = "" affinity: str = "" monthly_visits: str = "" category: str = "" category_rank: int = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Function to scrape search results (fully Selenium-based) def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: # Initialize WebDriver and load page driver = setup_driver() scrapeops_proxy_url = get_scrapeops_url(url) driver.get(scrapeops_proxy_url) time.sleep(3) # Allow page to load logger.info(f"Opened URL: {url}") # Find all rows of the search results table rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row") rank = 1 for row in rows: site_name = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare").text.strip() link = f"https://www.similarweb.com/website/{site_name}/" # Rank change processing rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find_element(By.TAG_NAME, "span").get_attribute("class").split()[-1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text.strip()) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text.strip()) average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip() pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text.strip()) bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip() # Create data object search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank += 1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded: {retries}") # Function to process and scrape all search results concurrently def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) # Function to process websites (Selenium-based) and extract competitor data def process_website(row, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = setup_driver() scrapeops_proxy_url = get_scrapeops_url(url) driver.get(scrapeops_proxy_url) time.sleep(3) # Allow page to load # Check if blocked by a modal or warning try: blocked_modal = driver.find_element(By.CSS_SELECTOR, "div.wa-limit-modal") if blocked_modal: raise Exception("Blocked by modal") except: pass # No blocking modal # Extract competitor data competitors = driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item") competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}_competitors.csv") for competitor in competitors: site_name = competitor.find_element(By.CSS_SELECTOR, "span.wa-competitors__list-item-title").text.strip() link = f"https://www.similarweb.com/website/{site_name}/" affinity = competitor.find_element(By.CSS_SELECTOR, "span.app-progress__value").text.strip() target_spans = competitor.find_elements(By.CSS_SELECTOR, "span.wa-competitors__list-column") monthly_visits = target_spans[2].text.strip() category = target_spans[3].text.strip() category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0").strip()) competitor_data = CompetitorData( name=site_name, url=link, affinity=affinity, monthly_visits=monthly_visits, category=category, category_rank=category_rank ) competitor_pipeline.add_data(competitor_data) competitor_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}, Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_website, reader, [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") # Example keywords to scrape keyword_list = [ {"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"} ] aggregate_files = [] # Crawl and save results filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") # Process each CSV file for file in aggregate_files: process_results(file,max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_THREADS
: Defines how many concurrent threads are used for processing and scraping tasks.MAX_RETRIES
Determines the number of retries the script will make if a request fails, such as due to a non-200 status code or network issues.keyword_list
A list of dictionaries, each containing a "category" and "subcategory," which specify the type of websites to be scraped from SimilarWeb.filename
The base name that is used to generate the CSV file where the data obtained from scraping will be saved.https://www.similarweb.com/top-websites/arts-and-entertainment/humor/
https://www.similarweb.com/top-websites/{CATEGORY}/{SUBCATEGORY}/
https://www.similarweb.com/website/pikabu.ru/
https://www.similarweb.com/website/{NAME_OF_SITE}/
wait
parameter to load our dynamic content. Once the page is loaded, we can simply locate the information by using its CSS class.For the results pages, each row has a class of top-table__row
. From there, we can locate all these rows and extract their data with ease.wa-competitors__list-item
. These div tags contain all the information for each individual competitor.wa-limit-modal
.mkdir similarweb-scraper
cd similarweb-scraper
python -m venv venv
source venv/bin/activate
pip install selenium pip install webdriver-manager
scrape_search_results()
.import os import json import logging from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager import time API_KEY = "" # Load the API key from the config file with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging configuration logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Function to set up the Selenium WebDriver with necessary options def setup_driver(): options = Options() options.add_argument("--headless") # Run in headless mode options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) # Main scraping function using Selenium def scrape_search_results(keyword, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: # Set up and start the WebDriver driver = setup_driver() driver.get(url) logger.info(f"Received page from: {url}") # Wait for the page to load fully time.sleep(3) # Find all rows for the top websites table rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row") rank = 1 for row in rows: link_holder = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare") site_name = link_holder.text link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column.top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find_element(By.CSS_SELECTOR, "span").get_attribute("class").split(" ")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text) bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text # Collecting scraped data search_data = { "name": site_name, "url": link, "rank": rank, "rank_change": rank_change, "average_visit": average_visit, "pages_per_visit": pages_per_visit, "bounce_rate": bounce_rate } rank += 1 print("search data: ",search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries += 1 finally: # Close the WebDriver after each attempt driver.quit() if not success: raise Exception(f"Max retries exceeded for: {url}") # Function to start the scraping process for a list of keywords def start_scrape(keywords, retries=3): for keyword in keywords: scrape_search_results(keyword, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") # Input list of keywords to scrape keyword_list = [ {"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"} ] # Start scraping process start_scrape(keyword_list, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row")
.link_holder
with link_holder = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare")
.link_holder
, we extract the site_name
and construct our link.rank_change_holder.find_element(By.CSS_SELECTOR, "span").get_attribute("class").split(" ")[1]
.row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text
.pages_per_visit
is retrieved with float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text)
.row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text
.SearchData
is required. This class will represent individual objects from the search results.Once the SearchData
object is created, it needs to be passed into a DataPipeline. The DataPipeline is responsible for opening a pipe to a CSV file. It removes duplicates by name and then saves all the non-duplicate objects to the CSV file.Below is our SearchData
class, which we use to represent individual ranking results.@dataclass class SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name).strip() setattr(self, field.name, value)
DataPipeline
.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv()
start_scrape()
when we put everything together.start_scrape()
then sends the pipeline to our parsing function.import os import csv import json import logging import time from dataclasses import dataclass, fields, asdict from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager API_KEY = "" # Load API key from config with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging configuration logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Dataclass representing individual search results @dataclass class SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name).strip() setattr(self, field.name, value) # Class for handling data storage to CSV class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Function to set up Selenium WebDriver def setup_driver(): options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) # Function to scrape search results using Selenium def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: # Setup and start Selenium WebDriver driver = setup_driver() driver.get(url) logger.info(f"Received page from: {url}") time.sleep(3) # Wait for the page to load # Find rows in the search results rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row") rank = 1 for row in rows: link_holder = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare") site_name = link_holder.text.strip() link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column.top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find_element(By.CSS_SELECTOR, "span").get_attribute("class").split(" ")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip() pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text) bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip() # Create a SearchData object search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) # Add data to the pipeline data_pipeline.add_data(search_data) rank += 1 logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries - tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded for: {url}") # Function to start the scraping process for a list of keywords def start_scrape(keywords, data_pipeline=None, retries=3): for keyword in keywords: scrape_search_results(keyword, data_pipeline=data_pipeline, retries=retries) # Main execution if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") # Input list of keywords to scrape keyword_list = [ {"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"} ] # Initialize DataPipeline filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") # Start the scraping process start_scrape(keyword_list, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) # Close the pipeline after scraping crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
SearchData
. These SearchData objects are then passed into our DataPipeline and stored in a CSV file.ThreadPoolExecutor
.Once we have the ability to open several threads, we can employ those threads to run our parsing function on multiple pages simultaneously.Below is our start_scrape()
function modified for concurrency.def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) )
scrape_search_results
by utilizing multiple threads. The array keywords
contains the items we wish to search for. All additional arguments to scrape_search_results
are passed in as arrays.import os import csv import json import logging import time from dataclasses import dataclass, fields, asdict from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" # Load API key from config with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging configuration logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Dataclass representing individual search results @dataclass class SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name).strip() setattr(self, field.name, value) # Class for handling data storage to CSV class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Function to set up Selenium WebDriver def setup_driver(): options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) # Function to scrape search results using Selenium def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: # Setup and start Selenium WebDriver driver = setup_driver() driver.get(url) logger.info(f"Received page from: {url}") time.sleep(3) # Wait for the page to load # Find rows in the search results rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row") rank = 1 for row in rows: link_holder = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare") site_name = link_holder.text.strip() link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column.top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find_element(By.CSS_SELECTOR, "span").get_attribute("class").split(" ")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip() pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text) bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip() # Create a SearchData object search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) # Add data to the pipeline data_pipeline.add_data(search_data) rank += 1 logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries - tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded for: {url}") # Function to start the scraping process for a list of keywords def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) # Main execution if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") # Input list of keywords to scrape keyword_list = [ {"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"} ] # Initialize DataPipeline filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") # Start the scraping process start_scrape(keyword_list, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES) # Close the pipeline after scraping crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
API_KEY
, url
, and wait
— we can obtain as many addresses as possible.This tells ScrapeOps that we’re willing to wait 3 seconds for the content to load, without concern for the country through which we’re routed.This approach provides us with the largest possible pool of IP addresses since routing can happen through any server that ScrapeOps supports.def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import os import csv import json import logging import time from urllib.parse import urlencode from dataclasses import dataclass, fields, asdict from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" # Load API key from config with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Logging configuration logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Dataclass representing individual search results @dataclass class SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name).strip() setattr(self, field.name, value) # Class for handling data storage to CSV class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Function to set up Selenium WebDriver def setup_driver(): options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) # Function to scrape search results using Selenium def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: # Setup and start Selenium WebDriver driver = setup_driver() scrapeops_proxy_url = get_scrapeops_url(url) driver.get(scrapeops_proxy_url) logger.info(f"Received page from: {url}") time.sleep(3) # Wait for the page to load # Find rows in the search results rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row") rank = 1 for row in rows: link_holder = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare") site_name = link_holder.text.strip() link = f"https://www.similarweb.com/website/{site_name}/" rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column.top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find_element(By.CSS_SELECTOR, "span").get_attribute("class").split(" ")[1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text) average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip() pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text) bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip() # Create a SearchData object search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) # Add data to the pipeline data_pipeline.add_data(search_data) rank += 1 logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries - tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded for: {url}") # Function to start the scraping process for a list of keywords def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) # Main execution if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") # Input list of keywords to scrape keyword_list = [ {"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"} ] # Initialize DataPipeline filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") # Start the scraping process start_scrape(keyword_list, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES) # Close the pipeline after scraping crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
MAX_THREADS
are set to 5. Since we're only searching 2 categories, ThreadPoolExecutor
will use 2 threads to run this and finish it.In the second half of our article, we'll make use of all 5 threads when writing the scraper.Here is our main:# Main execution if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") # Input list of keywords to scrape keyword_list = [ {"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"} ] # Initialize DataPipeline filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") # Start the scraping process start_scrape(keyword_list, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES) # Close the pipeline after scraping crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
def process_website(row, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = setup_driver() driver.get(url) time.sleep(3) # Allow page to load # Check if blocked by a modal or warning try: blocked_modal = driver.find_element(By.CSS_SELECTOR, "div.wa-limit-modal") if blocked_modal: raise Exception("Blocked by modal") except: pass # No blocking modal # Extract competitor data competitors = driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item") for competitor in competitors: site_name = competitor.find_element(By.CSS_SELECTOR, "span.wa-competitors__list-item-title").text.strip() link = f"https://www.similarweb.com/website/{site_name}/" affinity = competitor.find_element(By.CSS_SELECTOR, "span.app-progress__value").text.strip() target_spans = competitor.find_elements(By.CSS_SELECTOR, "span.wa-competitors__list-column") monthly_visits = target_spans[2].text.strip() category = target_spans[3].text.strip() category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0").strip()) competitor_data = { "name": site_name, "url": link, "affinity": affinity, "monthly_visits": monthly_visits, "category": category, "category_rank": category_rank } print(competitor_data) # Replace with actual storage mechanism success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}, Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item")
site_name
affinity
monthly_visits
category
category_link
site_name
.process_website()
to every row in the file.Below is our process_results()
function.def process_results(csv_file, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_website(row, retries=retries)
import os import csv import json import time import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures from dataclasses import dataclass, field, fields, asdict # ScrapeOps API Key (if you're using a proxy service like ScrapeOps) API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Setup Selenium WebDriver def setup_driver(): options = Options() options.add_argument("--headless") # Run in headless mode for efficiency options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) @dataclass class SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Function to scrape search results (fully Selenium-based) def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: # Initialize WebDriver and load page driver = setup_driver() scrapeops_proxy_url = get_scrapeops_url(url) driver.get(scrapeops_proxy_url) time.sleep(3) # Allow page to load logger.info(f"Opened URL: {url}") # Find all rows of the search results table rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row") rank = 1 for row in rows: site_name = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare").text.strip() link = f"https://www.similarweb.com/website/{site_name}/" # Rank change processing rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find_element(By.TAG_NAME, "span").get_attribute("class").split()[-1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text.strip()) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text.strip()) average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip() pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text.strip()) bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip() # Create data object search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank += 1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded: {retries}") # Function to process and scrape all search results concurrently def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) # Function to process websites (Selenium-based) def process_website(row, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = setup_driver() driver.get(url) time.sleep(3) # Allow page to load # Check if blocked by a modal or warning try: blocked_modal = driver.find_element(By.CSS_SELECTOR, "div.wa-limit-modal") if blocked_modal: raise Exception("Blocked by modal") except: pass # No blocking modal # Extract competitor data competitors = driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item") for competitor in competitors: site_name = competitor.find_element(By.CSS_SELECTOR, "span.wa-competitors__list-item-title").text.strip() link = f"https://www.similarweb.com/website/{site_name}/" affinity = competitor.find_element(By.CSS_SELECTOR, "span.app-progress__value").text.strip() target_spans = competitor.find_elements(By.CSS_SELECTOR, "span.wa-competitors__list-column") monthly_visits = target_spans[2].text.strip() category = target_spans[3].text.strip() category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0").strip()) competitor_data = { "name": site_name, "url": link, "affinity": affinity, "monthly_visits": monthly_visits, "category": category, "category_rank": category_rank } print(competitor_data) # Replace with actual storage mechanism success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}, Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") # Function to load and process CSV results def process_results(csv_file, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_website(row, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") # Example keywords to scrape keyword_list = [ {"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"} ] aggregate_files = [] # Crawl and save results filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") # Process each CSV file for file in aggregate_files: process_results(file, retries=MAX_RETRIES)
process_results()
loads our CSV into an array. We apply process_website()
to each row of the file.CompetitorData
, which is quite similar to our SearchData
. Below is our CompetitorData
class.@dataclass class CompetitorData: name: str = "" url: str = "" affinity: str = "" monthly_visits: str = "" category: str = "" category_rank: int = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
and pass CompetitorData
into it.import os import csv import json import time import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures from dataclasses import dataclass, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Setup Selenium WebDriver def setup_driver(): options = Options() options.add_argument("--headless") # Run in headless mode for efficiency options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) @dataclass class SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class CompetitorData: name: str = "" url: str = "" affinity: str = "" monthly_visits: str = "" category: str = "" category_rank: int = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Function to scrape search results (fully Selenium-based) def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: # Initialize WebDriver and load page driver = setup_driver() scrapeops_proxy_url = get_scrapeops_url(url) driver.get(scrapeops_proxy_url) time.sleep(3) # Allow page to load logger.info(f"Opened URL: {url}") # Find all rows of the search results table rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row") rank = 1 for row in rows: site_name = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare").text.strip() link = f"https://www.similarweb.com/website/{site_name}/" # Rank change processing rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find_element(By.TAG_NAME, "span").get_attribute("class").split()[-1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text.strip()) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text.strip()) average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip() pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text.strip()) bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip() # Create data object search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank += 1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded: {retries}") # Function to process and scrape all search results concurrently def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) # Function to process websites (Selenium-based) and extract competitor data def process_website(row, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = setup_driver() driver.get(url) time.sleep(3) # Allow page to load # Check if blocked by a modal or warning try: blocked_modal = driver.find_element(By.CSS_SELECTOR, "div.wa-limit-modal") if blocked_modal: raise Exception("Blocked by modal") except: pass # No blocking modal # Extract competitor data competitors = driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item") competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}_competitors.csv") for competitor in competitors: site_name = competitor.find_element(By.CSS_SELECTOR, "span.wa-competitors__list-item-title").text.strip() link = f"https://www.similarweb.com/website/{site_name}/" affinity = competitor.find_element(By.CSS_SELECTOR, "span.app-progress__value").text.strip() target_spans = competitor.find_elements(By.CSS_SELECTOR, "span.wa-competitors__list-column") monthly_visits = target_spans[2].text.strip() category = target_spans[3].text.strip() category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0").strip()) competitor_data = CompetitorData( name=site_name, url=link, affinity=affinity, monthly_visits=monthly_visits, category=category, category_rank=category_rank ) competitor_pipeline.add_data(competitor_data) competitor_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}, Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") # Function to load and process CSV results def process_results(csv_file, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_website(row, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") # Example keywords to scrape keyword_list = [ {"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"} ] aggregate_files = [] # Crawl and save results filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") # Process each CSV file for file in aggregate_files: process_results(file, retries=MAX_RETRIES)
CompetitorData
is used to represent the competitors we extract from the page.Inside of our parsing function, we open a new DataPipeline and pass these CompetitorData
objects into it.process_results()
to take advantage of multiple threads, using ThreadPoolExecutor
.Below is our multithreaded version of process_results()
.def process_results(csv_file, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_website, reader, [retries] * len(reader) )
process_website
function across multiple threads. The reader
is an array of objects that we aim to process using several threads. The retries
are also passed as an array, matching the length of the reader
array.All the arguments passed to process_website
are given into executor.map()
in array form, which are then forwarded into process_website
.Below is the full code we've written so far.import os import csv import json import time import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures from dataclasses import dataclass, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Setup Selenium WebDriver def setup_driver(): options = Options() options.add_argument("--headless") # Run in headless mode for efficiency options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) @dataclass class SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class CompetitorData: name: str = "" url: str = "" affinity: str = "" monthly_visits: str = "" category: str = "" category_rank: int = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Function to scrape search results (fully Selenium-based) def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: # Initialize WebDriver and load page driver = setup_driver() scrapeops_proxy_url = get_scrapeops_url(url) driver.get(scrapeops_proxy_url) time.sleep(3) # Allow page to load logger.info(f"Opened URL: {url}") # Find all rows of the search results table rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row") rank = 1 for row in rows: site_name = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare").text.strip() link = f"https://www.similarweb.com/website/{site_name}/" # Rank change processing rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find_element(By.TAG_NAME, "span").get_attribute("class").split()[-1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text.strip()) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text.strip()) average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip() pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text.strip()) bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip() # Create data object search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank += 1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded: {retries}") # Function to process and scrape all search results concurrently def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) # Function to process websites (Selenium-based) and extract competitor data def process_website(row, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = setup_driver() driver.get(url) time.sleep(3) # Allow page to load # Check if blocked by a modal or warning try: blocked_modal = driver.find_element(By.CSS_SELECTOR, "div.wa-limit-modal") if blocked_modal: raise Exception("Blocked by modal") except: pass # No blocking modal # Extract competitor data competitors = driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item") competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}_competitors.csv") for competitor in competitors: site_name = competitor.find_element(By.CSS_SELECTOR, "span.wa-competitors__list-item-title").text.strip() link = f"https://www.similarweb.com/website/{site_name}/" affinity = competitor.find_element(By.CSS_SELECTOR, "span.app-progress__value").text.strip() target_spans = competitor.find_elements(By.CSS_SELECTOR, "span.wa-competitors__list-column") monthly_visits = target_spans[2].text.strip() category = target_spans[3].text.strip() category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0").strip()) competitor_data = CompetitorData( name=site_name, url=link, affinity=affinity, monthly_visits=monthly_visits, category=category, category_rank=category_rank ) competitor_pipeline.add_data(competitor_data) competitor_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}, Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_website, reader, [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") # Example keywords to scrape keyword_list = [ {"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"} ] aggregate_files = [] # Crawl and save results filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") # Process each CSV file for file in aggregate_files: process_results(file,max_threads=MAX_THREADS, retries=MAX_RETRIES)
proxy_url = get_scrapeops_url(url) driver.get(proxy_url)
import os import csv import json import time import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures from dataclasses import dataclass, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url): payload = { "api_key": API_KEY, "url": url, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Setup Selenium WebDriver def setup_driver(): options = Options() options.add_argument("--headless") # Run in headless mode for efficiency options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options) @dataclass class SearchData: name: str = "" url: str = "" rank: int = 0 rank_change: int = 0 average_visit: str = "" pages_per_visit: float = 0.0 bounce_rate: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class CompetitorData: name: str = "" url: str = "" affinity: str = "" monthly_visits: str = "" category: str = "" category_rank: int = None def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Function to scrape search results (fully Selenium-based) def scrape_search_results(keyword, data_pipeline=None, retries=3): url = f"https://www.similarweb.com/top-websites/{keyword['category']}/{keyword['subcategory']}/" tries = 0 success = False while tries <= retries and not success: try: # Initialize WebDriver and load page driver = setup_driver() scrapeops_proxy_url = get_scrapeops_url(url) driver.get(scrapeops_proxy_url) time.sleep(3) # Allow page to load logger.info(f"Opened URL: {url}") # Find all rows of the search results table rows = driver.find_elements(By.CSS_SELECTOR, "tr.top-table__row") rank = 1 for row in rows: site_name = row.find_element(By.CSS_SELECTOR, "a.tw-table__compare").text.strip() link = f"https://www.similarweb.com/website/{site_name}/" # Rank change processing rank_change_holder = row.find_element(By.CSS_SELECTOR, "td.top-table__column--rank-change") rank_change = 0 up_or_down = rank_change_holder.find_element(By.TAG_NAME, "span").get_attribute("class").split()[-1] if "change--up" in up_or_down: rank_change += int(rank_change_holder.text.strip()) elif "change--down" in up_or_down: rank_change -= int(rank_change_holder.text.strip()) average_visit = row.find_element(By.CSS_SELECTOR, "span.tw-table__avg-visit-duration").text.strip() pages_per_visit = float(row.find_element(By.CSS_SELECTOR, "span.tw-table__pages-per-visit").text.strip()) bounce_rate = row.find_element(By.CSS_SELECTOR, "span.tw-table__bounce-rate").text.strip() # Create data object search_data = SearchData( name=site_name, url=link, rank=rank, rank_change=rank_change, average_visit=average_visit, pages_per_visit=pages_per_visit, bounce_rate=bounce_rate ) rank += 1 data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}, retries left {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded: {retries}") # Function to process and scrape all search results concurrently def start_scrape(keywords, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [data_pipeline] * len(keywords), [retries] * len(keywords) ) # Function to process websites (Selenium-based) and extract competitor data def process_website(row, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = setup_driver() scrapeops_proxy_url = get_scrapeops_url(url) driver.get(scrapeops_proxy_url) time.sleep(3) # Allow page to load # Check if blocked by a modal or warning try: blocked_modal = driver.find_element(By.CSS_SELECTOR, "div.wa-limit-modal") if blocked_modal: raise Exception("Blocked by modal") except: pass # No blocking modal # Extract competitor data competitors = driver.find_elements(By.CSS_SELECTOR, "div.wa-competitors__list-item") competitor_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}_competitors.csv") for competitor in competitors: site_name = competitor.find_element(By.CSS_SELECTOR, "span.wa-competitors__list-item-title").text.strip() link = f"https://www.similarweb.com/website/{site_name}/" affinity = competitor.find_element(By.CSS_SELECTOR, "span.app-progress__value").text.strip() target_spans = competitor.find_elements(By.CSS_SELECTOR, "span.wa-competitors__list-column") monthly_visits = target_spans[2].text.strip() category = target_spans[3].text.strip() category_rank = int(target_spans[4].text.replace("#", "").replace(",", "").replace("--", "0").strip()) competitor_data = CompetitorData( name=site_name, url=link, affinity=affinity, monthly_visits=monthly_visits, category=category, category_rank=category_rank ) competitor_pipeline.add_data(competitor_data) competitor_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}, Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_website, reader, [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") # Example keywords to scrape keyword_list = [ {"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"} ] aggregate_files = [] # Crawl and save results filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") # Process each CSV file for file in aggregate_files: process_results(file,max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
if you need a refresher.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 logger.info(f"Crawl starting...") # Example keywords to scrape keyword_list = [ {"category": "arts-and-entertainment", "subcategory": "humor"}, {"category": "arts-and-entertainment", "subcategory": "animation-and-comics"} ] aggregate_files = [] # Crawl and save results filename = "arts-and-entertainment" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword_list, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") # Process each CSV file for file in aggregate_files: process_results(file,max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
file. Ignoring these rules could result in account suspension or permanent bans.You can view these for SimilarWeb by checking the links below.If you're unsure of your scraper, you should talk to an attorney.