Skip to main content

Scrape Leboncoin With Python Selenium

How to Scrape Leboncoin With Selenium

Leboncoin has been in existence for almost 20 years, having been established in 2006. It serves as a primary platform for second-hand items, real estate, and job listings. Scraping data from Leboncoin, however, can be very challenging.

Along with an effective anti-bot system, users are required to accept tracking cookies to access many listings. Nevertheless, it is possible to extract product data from their site if the correct approach is taken.

In this project, we will focus on scraping cars from Leboncoin, but the same method can be applied to scrape almost any type of data from the platform.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape Leboncoin

If you want to scrape Leboncoin but lack the time to read or write code, feel free to use our scraper below!

  1. Create a new folder for your project and include a config.json file in it.
  2. In the configuration file, insert your ScrapeOps API key as follows: {"api_key": "your-super-secret-api-key"}.
  3. Next, take the code provided below and paste it into a new Python file.

import os
import re
import csv
import json
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from dataclasses import dataclass, field, fields, asdict
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Helper function to get the ScrapeOps proxy URL
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define the SearchData dataclass
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")

@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())

# Define the DataPipeline class
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

# Main scraping function
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))
logger.info(f"Opened URL: {url}")

# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")

for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")

price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]

# Store data in SearchData and add to pipeline
search_data = SearchData(name=name, url=link, price=price, currency=currency)
data_pipeline.add_data(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")

logger.info(f"Successfully parsed data from: {url}")
success = True

except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1

driver.quit()
if not success:
raise Exception(f"Max retries exceeded for {url}")

# Function to start the scraping process
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))

script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
json_data = json.loads(script_element.get_attribute("text"))

print(json_data)

safe_filename = re.sub(r'[<>:"/|?*x00-x1F]', '_', row['name']) # Replace invalid characters with '_'

vehicle_pipeline = DataPipeline(f"{safe_filename}.csv")

#added some checks because not all ads will be same, some ads may have same keywords but not exactly about that thing, for example in this case, some ads are for parts of Ford Mustang
vehicle_data = VehicleData(
name=json_data.get("name", "No name"),
description=json_data.get("description", "No description"),
price=json_data.get("offers", {}).get("price", 0),
currency=json_data.get("offers", {}).get("priceCurrency", "No currency"),
brand=json_data.get("brand", {}).get("name", "No brand"),
model=json_data.get("model", "No model"),
year=json_data.get("vehicleModelDate", "No year"),
mileage=int(json_data.get("mileageFromOdometer", {}).get("value", 0)),
transmission=json_data.get("vehicleTransmission", "No transmission")
)
vehicle_pipeline.add_data(vehicle_data)
vehicle_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries - tries}")
finally:
driver.quit()
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"

logger.info(f"Crawl starting...")

keyword_list = ["ford mustang"]
aggregate_files = []

for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

To adjust your result, change any of the following:

  • MAX_RETRIES: The number of maximum retry attempts for HTTP requests that fail.
  • MAX_THREADS: The maximum concurrent threads running during the process of scraping.
  • PAGES: The total number of search result pages to scrape per keyword.
  • LOCATION: The country code or geographic location used in the scraping process.
  • keyword_list: A script-performed search list of product keywords to scrape product details.

How To Architect Our Leboncoin Scraper

While scraping Leboncoin, we will stick to a structure similar to what we have used in most parts of the "How To Scrape" series.

  • Our first requirement is a search crawler, which will conduct a search and save the results into a CSV file.
  • Following that, a product scraper will gather and save detailed data on each car scraped during the process.

The crawler will follow these steps to be built:

  • Parsing the search results.
  • Managing pagination to handle result batches.
  • Storing the parsed data.
  • Using concurrency to process multiple search pages simultaneously.
  • Incorporating proxies to bypass anti-bot systems.

We will construct the scraper using these steps:

  • Parsing the product pages.
  • Reading the saved data.
  • Storing the newly parsed details.
  • Leveraging concurrency to handle multiple products at the same time.
  • Using proxy integration to bypass anti-bot systems.

Understanding How To Scrape Leboncoin

Scraping Leboncoin can be somewhat challenging. To extract the data, it’s important to first identify its location.

In the next sections, we’ll explore how to find these pages, understand their layout, and locate their data.

Additionally, we’ll cover how to manage pagination and control geolocation using the ScrapeOps Proxy Aggregator.


Step 1: How To Request Leboncoin Pages

Similar to other websites, the process begins with a GET request. When a browser accesses a site, it sends a GET request to the server and displays the page upon receiving the response.

A crawler needs to send a GET request to fetch search pages, and a scraper uses a GET request to collect product data.

For fetching search results, the crawler sends a GET request. Refer to the example URL in the screenshot below:

https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
  • Here, text=ford+mustang specifies the search query.
  • In this URL structure, text indicates the query, and ford+mustang specifies the keyword search for "Ford Mustang."

The base URLs will follow this format:

https://www.leboncoin.fr/recherche?text={FORMATTED_KEYWORD}

Search Results Page

The next screenshot contains a page for an individual product. The URL is:

https://www.leboncoin.fr/ad/voitures/2844784378

URLs could be reconstructed using this format:

https://www.leboncoin.fr/ad/voitures/{LISTING_ID}

However, since we’ll be scraping URLs during the crawl, reconstructing them won’t be required.

Product Page


Step 2: How To Extract Data From Leboncoin Results and Pages

Each listing is enclosed in an a element that has a data-test-id of ad. This is visible in the screenshot below.

Search Results Page HTML Inspection

Let’s now examine our product data. The product data is contained within a nested JSON blob.

Below, there are two screenshots: one without the cookie prompt and another with it.

Search Results Page HTML Inspection

Search Results Page Cookies HTML Inspection

Since the JSON blob appears on both pages, clicking the cookie button is unnecessary.


Step 3: How To Control Pagination

Refer back to the URL we discussed earlier. Pagination is straightforward; take a look:

https://www.leboncoin.fr/recherche?text=ford+mustang&page=2

The page=2 parameter tells the Leboncoin server to display the second page of results. Our complete URLs will follow this structure:

https://www.leboncoin.fr/recherche?text=ford+mustang&page={page_number+1}

Since Python starts counting at 0, we use page_number+1.


Step 4: Geo-located Data

Geolocation can be fully managed using the ScrapeOps Proxy API.

When communicating with ScrapeOps, a country parameter can be included. This allows us to specify a location, and ScrapeOps routes the request through that location.

  • To appear as if located in the US, set "country": "us".
  • To appear as if located in the UK, set "country": "uk".

The full list of countries is available here.


Setting Up Our Leboncoin Scraper Project

To begin, execute the commands below to set up:

Create a New Project Folder

mkdir leboncoin-scraper

cd leboncoin-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install selenium
pip install webdriver-manager

Build A Leboncoin Search Crawler

We’re now ready to create our search crawler. In the next sections, we’ll incorporate these features into it:

  • Parsing
  • Pagination
  • Data Storage
  • Concurrency
  • Proxy Integration

Step 1: Create Simple Search Data Parser

To begin, we need a script that includes our basic structure. The code below accomplishes this by adding the essential components: error handling, retry logic, and a parsing function.

If you’re learning web scraping, focus closely on the parsing function, scrape_search_results().


import os
import json
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}"
tries = 0
success = False

# Configure Selenium WebDriver
chrome_options = Options()
cchrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Opened URL: {url}")

# Wait for the results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")

for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")

# Extract information from the card
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")

price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]

search_data = {
"name": name,
"url": link,
"price": price,
"currency": currency
}

print(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")

logger.info(f"Successfully parsed data from: {url}")
success = True

except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1

if not success:
driver.quit()
raise Exception(f"Max Retries exceeded: {retries}")

driver.quit()

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

# INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []

# Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

Here’s how the data extraction process works:

  • All listings are enclosed within an a element, which we locate using driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']").
  • The card.get_attribute("href") retrieves the href, which we combine with the domain name to generate a link for each listing.
  • We extract p elements using card.find_element(By.TAG_NAME, "p").
  • The listing names are obtained with name_element.get_attribute("title").replace("/", "-").replace(" ", "-").
  • The price string is fetched using card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']").text, and string splitting is applied to extract both the price and currency.

Step 2: Add Pagination

The pagination process relies on the page parameter. Paginated URLs follow this structure:

https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}

To crawl multiple pages, we need to implement a new function, start_scrape(). This function uses a for loop to scrape a specified range of pages.

def start_scrape(keyword, pages, location, retries=3):    
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)

Take a look at the code below to see how everything works together.


import os
import json
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Opened URL: {url}")

# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")

for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")

try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")

price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]

search_data = {
"name": name,
"url": link,
"price": price,
"currency": currency
}

print(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")

logger.info(f"Successfully parsed data from: {url}")
success = True

except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1

if not success:
driver.quit()
raise Exception(f"Max Retries exceeded: {retries}")

driver.quit()

def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

# INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []

# Job Processes
for keyword in keyword_list:
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

Our paginated URLs follow this format:

https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}

The function start_scrape() enables us to crawl across multiple pages.


Step 3: Storing the Scraped Data

The primary purpose of scraping is data storage. By saving the data, we can review it later and create programs to process it. For this, we need to store the data in a CSV file.

To achieve this, we require a dataclass to represent the objects we want to save, as well as a DataPipeline to store these objects and eliminate duplicates.

Below is the SearchData class, which represents the data objects we have been extracting.


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")

Here is our DataPipeline. We use it to pipe SearchData objects into our CSV file.


class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

Here is the updated code:


import os
import csv
import json
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from dataclasses import dataclass, field, fields, asdict
from webdriver_manager.chrome import ChromeDriverManager

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define the SearchData dataclass
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")

# Define the DataPipeline class
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

# Main scraping function
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Opened URL: {url}")

# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")

for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")

try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")

price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]

# Store data in SearchData and add to pipeline
search_data = SearchData(name=name, url=link, price=price, currency=currency)
data_pipeline.add_data(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")

logger.info(f"Successfully parsed data from: {url}")
success = True

except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1

driver.quit()
if not success:
raise Exception(f"Max retries exceeded for {url}")

# Function to start the scraping process
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info("Crawl starting...")

keyword_list = ["ford mustang"]
aggregate_files = []

for keyword in keyword_list:
filename = keyword.replace(" ", "-") + ".csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")

logger.info("Crawl complete.")
  • Within the main function, we initialize a new DataPipeline and pass it to start_scrape(), which then forwards it to scrape_search_results().
  • When objects are parsed, they are converted into SearchData and added to the DataPipeline using the add_data() method.
  • After the crawling process is complete, we finalize by closing the pipeline with the close_pipeline() method.

Step 4: Adding Concurrency

Recall how we initially wrote start_scrape() using a for loop?

We will now enhance it for better speed and efficiency by replacing the for loop with a more powerful tool: ThreadPoolExecutor.

This approach allows us to run a specific function across multiple threads simultaneously.

Below is the updated start_scrape() function.

import concurrent.futures

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

  • Take note of the arguments provided to executor.map().
  • The function scrape_search_results is what we want to execute on each thread, while the remaining arguments are lists that will be passed into scrape_search_results() as parameters.

Step 5: Bypassing Anti-Bots

Although it’s not the most robust anti-bot system we've seen in this series, Leboncoin does implement an anti-bot system that will detect and block our scraper if precautions aren’t taken.

Lebencoin Blocked Page

We will create a simple function that accepts a URL as input and returns a ScrapeOps Proxied URL.

Take a look at get_scrapeops_url().


from urllib.parse import urlencode

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)

Here is the complete code for our crawler

import os  
import csv
import json
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from dataclasses import dataclass, field, fields, asdict
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Helper function to get the ScrapeOps proxy URL
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define the SearchData dataclass
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")

# Define the DataPipeline class
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

# Main scraping function
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))
logger.info(f"Opened URL: {url}")

# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")

for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")

try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")

price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]

# Store data in SearchData and add to pipeline
search_data = SearchData(name=name, url=link, price=price, currency=currency)
data_pipeline.add_data(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")

logger.info(f"Successfully parsed data from: {url}")
success = True

except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1

driver.quit()
if not success:
raise Exception(f"Max retries exceeded for {url}")

# Function to start the scraping process
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info("Crawl starting...")

keyword_list = ["ford mustang"]
aggregate_files = []

for keyword in keyword_list:
filename = keyword.replace(" ", "-") + ".csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")

logger.info("Crawl complete.")

Make sure to replace the URL passed to the driver with the proxy URL:

driver.get(get_scrapeops_url(url, location))


Step 6: Production Run

We’ll now test the crawler in a production environment by scraping three pages of Leboncoin listings.

The number of threads will be set to 5. Although only 3 of the 5 threads will be used during the crawl, all 5 will be fully utilized later in the process.

Check out the implementation in the main.

if __name__ == "__main__":  
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"

logger.info("Crawl starting...")

keyword_list = ["ford mustang"]
aggregate_files = []

for keyword in keyword_list:
filename = keyword.replace(" ", "-") + ".csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")

logger.info("Crawl complete.")

You can modify any of the following settings as needed:

  • keyword_list: Holds the list of keywords to be searched and scraped.
  • MAX_RETRIES: Determines how many times the scraper will attempt to fetch a page after encountering an error.
  • MAX_THREADS: Sets the maximum number of threads available for concurrent scraping.
  • PAGES: Indicates the number of pages to scrape for each keyword.
  • LOCATION: Specifies the geographic origin of the scraping requests.

We successfully crawled 3 pages in 46.1198 seconds, averaging 15.37 seconds per page (46.1198 seconds / 3 pages).


Build A Leboncoin Scraper

Now it’s time to scrape detailed product data from Leboncoin.

In the following sections, we’ll create a scraper that uses the crawler’s CSV report to extract in-depth information about each product.


Step 1: Create Simple Product Data Parser

We’ll begin with a parsing function, incorporating error handling and retry logic as before. Check it out below.


def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(url, location)

script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
json_data = json.loads(script_element.get_attribute("text"))

print(json_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries - tries}")
finally:
driver.quit()
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")
  • All product data is embedded inside a script tag with the type application/ld+json.
  • For now, we’re printing this data, but later, it will be stored.

Step 2: Loading URLs To Scrape

Our parsing function requires a URL to work. We’ll use the URLs saved during the crawl. Let’s write a new function similar to start_scrape().

Instead of processing a sequential list of pages, this function will load the URLs from our CSV file into an array and apply process_item() to each one.

Below is the process_results() function.


def process_results(csv_file, location, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_item(row, location, retries=retries)

Take a look at the full code below.


import os
import csv
import json
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from dataclasses import dataclass, field, fields, asdict
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Helper function to get the ScrapeOps proxy URL
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define the SearchData dataclass
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")

# Define the DataPipeline class
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

# Main scraping function
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))
logger.info(f"Opened URL: {url}")

# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")

for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")

price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]

# Store data in SearchData and add to pipeline
search_data = SearchData(name=name, url=link, price=price, currency=currency)
data_pipeline.add_data(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")

logger.info(f"Successfully parsed data from: {url}")
success = True

except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1

driver.quit()
if not success:
raise Exception(f"Max retries exceeded for {url}")

# Function to start the scraping process
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(url)

script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
json_data = json.loads(script_element.get_attribute("text"))

print(json_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries - tries}")
finally:
driver.quit()
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def process_results(csv_file, location, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_item(row, location, retries=retries)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

keyword_list = ["ford mustang"]
aggregate_files = []

for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

Step 3: Storing the Scraped Data

As before, storing our data is essential; without it, the scrape would serve no purpose. Since we already have a working DataPipeline, all we need is an additional dataclass. This new dataclass will be named VehicleData.

Check out VehicleData below.


@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())

In the updated code, we initialize a new DataPipeline within process_item() and pass VehicleData into it.


import os
import re
import csv
import json
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from dataclasses import dataclass, field, fields, asdict
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Helper function to get the ScrapeOps proxy URL
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define the SearchData dataclass
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")

@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())

# Define the DataPipeline class
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

# Main scraping function
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))
logger.info(f"Opened URL: {url}")

# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")

for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")

price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]

# Store data in SearchData and add to pipeline
search_data = SearchData(name=name, url=link, price=price, currency=currency)
data_pipeline.add_data(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")

logger.info(f"Successfully parsed data from: {url}")
success = True

except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1

driver.quit()
if not success:
raise Exception(f"Max retries exceeded for {url}")

# Function to start the scraping process
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(url)

script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
json_data = json.loads(script_element.get_attribute("text"))

print(json_data)

safe_filename = re.sub(r'[<>:"/|?*x00-x1F]', '_', row['name']) # Replace invalid characters with '_'

vehicle_pipeline = DataPipeline(f"{safe_filename}.csv")

#added some checks because not all ads will be same, some ads may have same keywords but not exactly about that thing, for example in this case, some ads are for parts of Ford Mustang
vehicle_data = VehicleData(
name=json_data.get("name", "No name"),
description=json_data.get("description", "No description"),
price=json_data.get("offers", {}).get("price", 0),
currency=json_data.get("offers", {}).get("priceCurrency", "No currency"),
brand=json_data.get("brand", {}).get("name", "No brand"),
model=json_data.get("model", "No model"),
year=json_data.get("vehicleModelDate", "No year"),
mileage=int(json_data.get("mileageFromOdometer", {}).get("value", 0)),
transmission=json_data.get("vehicleTransmission", "No transmission")
)
vehicle_pipeline.add_data(vehicle_data)
vehicle_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries - tries}")
finally:
driver.quit()
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def process_results(csv_file, location, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_item(row, location, retries=retries)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

keyword_list = ["ford mustang"]
aggregate_files = []

for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

  • VehicleData is utilized to represent the detailed information extracted when scraping these objects.
  • Similar to how we handle SearchData, the data is saved to a CSV file using the DataPipeline.

Step 4: Adding Concurrency

Now it’s time to incorporate concurrency again. As before, we’ll replace the for loop with ThreadPoolExecutor.

Check out the code snippet below.


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
  • This time, process_item is the function we want to execute across multiple threads.
  • All other arguments for process_item are passed in as arrays, just as we did previously.

Step 5: Bypassing Anti-Bots

At this stage, bypassing anti-bots becomes straightforward. We simply need to apply get_scrapeops_url() in another part of the code.

This time, it will be used on the response within the process_item() function.

driver.get(get_scrapeops_url(url, location))  

Below is our production-ready code.


import os
import re
import csv
import json
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from dataclasses import dataclass, field, fields, asdict
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Helper function to get the ScrapeOps proxy URL
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define the SearchData dataclass
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")

@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())

# Define the DataPipeline class
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

# Main scraping function
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))
logger.info(f"Opened URL: {url}")

# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")

for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")

price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]

# Store data in SearchData and add to pipeline
search_data = SearchData(name=name, url=link, price=price, currency=currency)
data_pipeline.add_data(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")

logger.info(f"Successfully parsed data from: {url}")
success = True

except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1

driver.quit()
if not success:
raise Exception(f"Max retries exceeded for {url}")

# Function to start the scraping process
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))

script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
json_data = json.loads(script_element.get_attribute("text"))

print(json_data)

safe_filename = re.sub(r'[<>:"/|?*x00-x1F]', '_', row['name']) # Replace invalid characters with '_'

vehicle_pipeline = DataPipeline(f"{safe_filename}.csv")

#added some checks because not all ads will be same, some ads may have same keywords but not exactly about that thing, for example in this case, some ads are for parts of Ford Mustang
vehicle_data = VehicleData(
name=json_data.get("name", "No name"),
description=json_data.get("description", "No description"),
price=json_data.get("offers", {}).get("price", 0),
currency=json_data.get("offers", {}).get("priceCurrency", "No currency"),
brand=json_data.get("brand", {}).get("name", "No brand"),
model=json_data.get("model", "No model"),
year=json_data.get("vehicleModelDate", "No year"),
mileage=int(json_data.get("mileageFromOdometer", {}).get("value", 0)),
transmission=json_data.get("vehicleTransmission", "No transmission")
)
vehicle_pipeline.add_data(vehicle_data)
vehicle_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries - tries}")
finally:
driver.quit()
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

keyword_list = ["ford mustang"]
aggregate_files = []

for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)


Step 6: Production Run

We will use the same settings as before. First, we’ll perform a crawl of 3 pages, and then we’ll scrape each result obtained from that crawl.

If you’d like a refresher, here’s our main.


if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"

logger.info(f"Crawl starting...")

keyword_list = ["ford mustang"]
aggregate_files = []

for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

If you remember from earlier, our crawl took 46.1198 seconds.

On this run, we generated a CSV file with 83 results. The full run took 858.5097 seconds. 858.5097 - 46.1198 = 812.3899 seconds spent scraping.

812.3899 seconds / 83 products = 9.7878 seconds per product.


Scraping public information is generally legal, and in this article, we focused on scraping public data.

However, scraping private data (data restricted behind a login page) falls under a different set of privacy and intellectual property laws. If you’re unsure about the legality of your scraper, it’s best to consult an attorney.

Although our scraping was legal, Leboncoin has specific Terms and Conditions as well as a robots.txt file that outline their expectations for users.

Ignoring these policies could result in being banned from the site. You can review these policies through the links below:

NOTE: The Terms and Conditions are in French!


Conclusion

You’ve now learned how to crawl and scrape Leboncoin and experienced the proxy functionality of ScrapeOps firsthand! You’ve built iteratively and gained an understanding of key concepts such as parsing, pagination, data storage, concurrency, and proxy integration.

For more information on the technologies used in this article, explore the following resources:


More Python Web Scraping Guides

At ScrapeOps we have plenty of guides and tutorials for you to follow.

We love Python so much, we even wrote the playbook on scraping with it!

If you want to learn more from our "How To Scrape" series, check out the links below.