Skip to main content

Python Selenium Beginners Series Part 4: Retries & Concurrency

Python Selenium Beginners Series Part 4: Retries & Concurrency

So far in this Python Selenium 6-Part Beginner Series, you learned how to build a basic web scraper in Part 1, get it to scrape some data from a website in Part 2, clean up the data as it was being scraped, and then save the data to a file or database in Part 3.

In Part 4, you’ll learn how to make our scraper more robust and scalable by handling failed requests and using concurrency.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


Python Selenium 6-Part Beginner Series

  • Part 1: Basic Python Selenium Scraper - We'll go over the basics of scraping with Python, and build our first Python scraper. Part 1

  • Part 2: Cleaning Dirty Data & Dealing With Edge Cases - Web data can be messy, unstructured, and have lots of edge cases. In this tutorial we'll make our scraper robust to these edge cases, using data classes and data cleaning pipelines. Part 2

  • Part 3: Storing Data in AWS S3, MySQL & Postgres DBs - There are many different ways we can store the data that we scrape from databases, CSV files to JSON format, and S3 buckets. We'll explore several different ways we can store the data and talk about their pros, and cons and in which situations you would use them. Part 3

  • Part 4: Managing Retries & Concurrency - Make our scraper more robust and scalable by handling failed requests and using concurrency. (This article)

  • Part 5: Faking User-Agents & Browser Headers - Make our scraper production ready by using fake user agents & browser headers to make our scrapers look more like real users. (Coming Soon)

  • Part 6: Using Proxies To Avoid Getting Blocked - Explore how to use proxies to bypass anti-bot systems by hiding your real IP address and location. (Coming Soon)


Understanding Scraper Performance Bottlenecks

In any web scraping project, the network delay acts as the initial bottleneck. Scraping requires sending numerous requests to a website and processing their responses. Even though each request and response travel over the network in mere fractions of a second, these small delays accumulate and significantly impact scraping speed when many pages are involved (say, 5,000).

Although humans visiting just a few pages wouldn't notice such minor delays, scraping tools sending hundreds or thousands of requests can face delays that stretch into hours. Furthermore, network delay is just one factor impacting scraping speed.

The scraper does not only send and receive requests, but also analyses extracted data, identifies the relevant information, and potentially stores or processes it. While network delays may be minimal, these additional steps are CPU-intensive and can significantly slow down scraping.


Retry Requests and Concurrency Importance

When web scraping, retrying requests, and using concurrency are important for several reasons. Retrying requests helps handle temporary network glitches, server errors, rate limits, or connection timeouts, increasing the chances of a successful response.

Common status codes that indicate a retry is worth trying include:

  • 429: Too many requests
  • 500: Internal server error
  • 502: Bad gateway
  • 503: Service unavailable
  • 504: Gateway timeout

Websites often implement rate limits to control traffic. Retrying with delays can help you stay within these limits and avoid getting blocked. While scraping, you might encounter pages with dynamically loaded content. This may require multiple attempts and retries at intervals to retrieve all the elements.

Now let’s talk about concurrency. When you make sequential requests to websites, you make one at a time, wait for the response, and then make the next one.

In the diagram below, the blue boxes show the time when your program is actively working, while the red boxes show when it's paused waiting for an I/O operation, such as downloading data from the website, reading data from files, or writing data to files, to complete.

Selenium Web Scraping Playbook - requests sending sequentially

Source: Real Python

However, concurrency allows your program to handle multiple open requests to websites simultaneously, significantly improving performance and efficiency, particularly for time-consuming tasks.

By concurrently sending these requests, your program overlaps the waiting times for responses, reducing the overall waiting time and getting the final results faster.

Selenium Web Scraping Playbook - using threading to send the requests

Source: Real Python


Retry Logic Mechanism

Let's see how to implement retry logic functionality within our scraper. In part 3 of this series, the start_scrape() function iterated through a list of URLs and extracted data from each. Here, we'll enhance it to make multiple attempts at retrieving data from a webpage.

To implement the retry mechanism, first define the retry limit, which is the maximum number of attempts to scrape a particular URL before giving up.

The function will enter a while loop that continues until one of three conditions is met:

  1. Scraping is successful.
  2. The maximum number of retries (retry_limit) is reached.
  3. An exception occurs that prevents further retries.

Inside the loop, there are several steps to perform:

  1. Navigate to the specified URL: Use driver.get(url) to open the webpage in the Selenium-controlled browser.
  2. Get the response status code (indirectly): Retrieving the response status code directly in Selenium isn't straightforward. Here's how you can achieve it:
    • Retrieve the performance logs using driver.get_log('performance'). These logs contain browser performance data, including network requests and timing information.
    • Pass the retrieved logs to a function called get_status(). This function will analyze the logs to extract the HTTP response status code for the page request.

Here's the code for the get_status() function:

def get_status(logs):
for log in logs:
if log["message"]:
d = json.loads(log["message"])
try:
content_type = (
"text/html"
in d["message"]["params"]["response"]["headers"]["content-type"]
)
response_received = d["message"]["method"] == "Network.responseReceived"
if content_type and response_received:
return d["message"]["params"]["response"]["status"]
except:
pass

Here's the code that runs the loop and extracts the response status code. You can see that we call the get_status() function, passing the performance logs as an argument.

retry_count = 0
while retry_count < retry_limit:
driver.get(url)
logs = driver.get_log("performance")
response = get_status(logs)
response_text = driver.page_source

Continuing with the code, a response code of 200 indicates a successful request to load the webpage. We then check if anti-bot checks are enabled and whether the webpage detects our scraper failing the check. If anti-bot checks are enabled and our scraper bypasses them successfully, we proceed with normal scraping. Otherwise, we need to retry the request.

if response == 200:
if anti_bot_check and not passed_anti_bot_check(response_text):
print("Anti-bot check failed. Retrying...")
continue

The passed_anti_bot_check method accepts a response text as its argument. It determines whether an anti-bot check has been passed by searching for the specific string ("") within the response text. If the string is present, the method returns False; otherwise, it returns True.

def passed_anti_bot_check(self, response):
if "<title>Robot or human?</title>" in response:
return False
return True

Once the webpage loads successfully and anti-bot checks are bypassed (if present), we proceed with scraping the products as described in the previous parts of this series. This involves finding links to the next pages of products, if available, and adding them to the list of URLs for further scraping.

Here's the complete code for the start_scrape() function:

def start_scrape(url, retry_limit=3, anti_bot_check=False):
list_of_urls.remove(url)
print(f"Scraping URL: {url}")
retry_count = 0
while retry_count < retry_limit:
try:
driver.get(url)
logs = driver.get_log("performance")
response = get_status(logs)
response_text = driver.page_source

if response == 200:
if anti_bot_check and not passed_anti_bot_check(response_text):
print("Anti-bot check failed. Retrying...")
time.sleep(2) # Add a small delay before retrying
continue
products = driver.find_elements(By.CLASS_NAME, "product-item")
for product in products:
name = product.find_element(
By.CLASS_NAME, "product-item-meta__title"
).text
price = product.find_element(By.CLASS_NAME, "price").text
url = product.find_element(
By.CLASS_NAME, "product-item-meta__title"
).get_attribute("href")
data_pipeline.add_product(
{"name": name, "price": price, "url": url}
)
try:
next_page = driver.find_element(By.CSS_SELECTOR, "a[rel='next']")
url = next_page.get_attribute("href")
list_of_urls.append(url)
except:
print("No more pages found. Exiting...")
return
return
else:
print(f"Received status code: {response.status_code}. Retrying...")
except Exception as e:
print("Error occurred:", e)
retry_count += 1
time.sleep(2) # Add a small delay before retrying
print("Failed to scrape:", url)

Concurrency Management

Concurrency refers to the ability to execute multiple tasks or processes concurrently. Concurrency enables efficient utilization of system resources and can often speed up program execution. Python provides several methods and modules to achieve this. One common technique to achieve concurrency is multi-threading.

As the name implies, multi-threading refers to the ability of a processor to execute multiple threads concurrently. Operating systems usually create and manage hundreds of threads, switching CPU time between them rapidly.

Switching between tasks occurs so quickly that the illusion of multitasking is created. It's important to note that the CPU controls thread switching, and developers cannot control it.

Using the concurrent.futures module in Python, you can customize the number of threads you want to create to optimize your code. ThreadPoolExecutor is a popular class within this module that enables you to easily execute tasks concurrently using threads. ThreadPoolExecutor is ideal for I/O-bound tasks, where tasks frequently involve waiting for external resources, such as reading files or downloading data.

Here’s a simple code for adding concurrency to your scraper:

def concurrent_scrape(list_of_urls, num_threads=5, retry_limit=3, anti_bot_check=True):
while len(list_of_urls) > 0:
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
executor.map(
lambda url: start_scrape(url, retry_limit, anti_bot_check), list_of_urls
)

The statement with concurrent.futures.ThreadPoolExecutor() as executor: creates a ThreadPoolExecutor object named executor that manages a pool of worker threads. The max_workers parameter is set to the provided num_threads value, controlling the maximum number of threads in the pool.

The code then uses the executor.map method to apply the start_scrape function concurrently to each URL in the list_of_urls. This means the function will be executed for multiple URLs at the same time.

Inside the map function, a lambda function takes a URL as input and calls the start_scrape function with the URL, retry_limit, and anti_bot_check as arguments.

The loop continues iterating until all URLs in the list_of_urls have been processed.


Complete Code

Run the code below and see how your scraper becomes more robust and scalable by handling failed requests and using concurrency.

import os
import time
import csv
import json
from dataclasses import dataclass, field, fields, InitVar, asdict
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service

# Define dataclass for Product
@dataclass
class Product:
name: str = ""
price_string: InitVar[str] = ""
price_gb: float = field(init=False)
price_usd: float = field(init=False)
url: str = ""

def __post_init__(self, price_string):
self.name = self.clean_name()
self.price_gb = self.clean_price(price_string)
self.price_usd = self.convert_price_to_usd()
self.url = self.create_absolute_url()

# Clean product name
def clean_name(self):
if self.name == "":
return "missing"
return self.name.strip()

# Clean price string and convert to float
def clean_price(self, price_string):
price_string = price_string.strip()
price_string = price_string.replace("Sale price\n£", "")
price_string = price_string.replace("Sale price\nFrom £", "")
if price_string == "":
return 0.0
return float(price_string)

# Convert price from GBP to USD
def convert_price_to_usd(self):
return round(self.price_gb * 1.21, 2)

# Create absolute URL
def create_absolute_url(self):
if self.url == "":
return "missing"
return self.url

# Class for managing product data pipeline
class ProductDataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=5):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

# Save product data to CSV
def save_to_csv(self):
self.csv_file_open = True
products_to_save = []
products_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not products_to_save:
return
keys = [field.name for field in fields(products_to_save[0])]
file_exists = (
os.path.isfile(self.csv_filename) and os.path.getsize(
self.csv_filename) > 0
)

with open(
self.csv_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()
for product in products_to_save:
writer.writerow(asdict(product))
self.csv_file_open = False

# Clean raw product data
def clean_raw_product(self, scraped_data):
return Product(
name=scraped_data.get("name", ""),
price_string=scraped_data.get("price", ""),
url=scraped_data.get("url", ""),
)

# Check for duplicate products
def is_duplicate(self, product_data):
if product_data.name in self.names_seen:
print(f"Duplicate item found. Item dropped.")
return True
self.names_seen.append(product_data.name)
return False

# Add product to storage queue
def add_product(self, scraped_data):
product = self.clean_raw_product(scraped_data)
if not self.is_duplicate(product):
self.storage_queue.append(product)
if (
len(self.storage_queue) >= self.storage_queue_limit
and self.csv_file_open == False
):
self.save_to_csv()

# Close pipeline and save remaining data to CSV
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

# Start scraping a URL
def start_scrape(url, retry_limit=3, anti_bot_check=False):
list_of_urls.remove(url)
print(f"Scraping URL: {url}")
retry_count = 0
while retry_count < retry_limit:
try:
driver.get(url)
logs = driver.get_log('performance')
response = get_status(logs)
response_text = driver.page_source

if response == 200:
if anti_bot_check and not passed_anti_bot_check(response_text):
print("Anti-bot check failed. Retrying...")
time.sleep(2) # Add a small delay before retrying
continue
products = driver.find_elements(By.CLASS_NAME, "product-item")
for product in products:
name = product.find_element(
By.CLASS_NAME, "product-item-meta__title").text
price = product.find_element(By.CLASS_NAME, "price").text
url = product.find_element(
By.CLASS_NAME, "product-item-meta__title").get_attribute("href")
data_pipeline.add_product(
{"name": name, "price": price, "url": url})
try:
next_page = driver.find_element(
By.CSS_SELECTOR, "a[rel='next']")
url = next_page.get_attribute("href")
list_of_urls.append(url)
except:
print("No more pages found. Exiting...")
return
return
else:
print(f"Received status code: {
response.status_code}. Retrying...")
except Exception as e:
print("Error occurred:", e)
retry_count += 1
time.sleep(2) # Add a small delay before retrying
print("Failed to scrape:", url)

# Get status code from performance logs
def get_status(logs):
for log in logs:
if log['message']:
d = json.loads(log['message'])
try:
content_type = 'text/html' in d['message']['params']['response']['headers']['content-type']
response_received = d['message']['method'] == 'Network.responseReceived'
if content_type and response_received:
return d['message']['params']['response']['status']
except:
pass

# Check if anti-bot check passed
def passed_anti_bot_check(response):
if "<title>Robot or human?</title>" in response:
return False
return True

# Scrape URLs concurrently
def concurrent_scrape(list_of_urls, num_threads=5, retry_limit=3, anti_bot_check=True):
while len(list_of_urls) > 0:
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
executor.map(lambda url: start_scrape(
url, retry_limit, anti_bot_check), list_of_urls)

# Main execution
if __name__ == "__main__":
chrome_options = webdriver.ChromeOptions()
service = Service(ChromeDriverManager().install())
chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'})
chrome_options.add_argument("--headless")
driver = webdriver.Chrome(service=service, options=chrome_options)
data_pipeline = ProductDataPipeline(csv_filename="product_data.csv")
list_of_urls = [
"https://www.chocolate.co.uk/collections/all",
]
concurrent_scrape(list_of_urls, num_threads=5)
data_pipeline.close_pipeline()
driver.quit()

The final result is: Selenium Web Scraping Playbook - data stored in a csv file


Next Steps

We hope you now have a good understanding of why you need to retry requests and use concurrency when web scraping. This includes how the retry logic works, how to check for anti-bots, and how the concurrency management works.

If you would like the code from this example please check out on Github here!

The next tutorial covers how to make our spider production-ready by managing our user agents and IPs to avoid getting blocked. (Part 5)