How to Scrape Leboncoin With Selenium
Leboncoin has been in existence for almost 20 years, having been established in 2006. It serves as a primary platform for second-hand items, real estate, and job listings. Scraping data from Leboncoin, however, can be very challenging.
Along with an effective anti-bot system, users are required to accept tracking cookies to access many listings. Nevertheless, it is possible to extract product data from their site if the correct approach is taken.
In this project, we will focus on scraping cars from Leboncoin, but the same method can be applied to scrape almost any type of data from the platform.
- TLDR: How to Scrape Leboncoin
- How To Architect Our Scraper
- Understanding How To Scrape Leboncoin
- Setting Up Our Leboncoin Scraper
- Build A Leboncoin Search Crawler
- Build A Leboncoin Scraper
- Legal and Ethical Considerations
- Conclusion
- More Python Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Leboncoin
If you want to scrape Leboncoin but lack the time to read or write code, feel free to use our scraper below!
- Create a new folder for your project and include a
config.json
file in it. - In the configuration file, insert your ScrapeOps API key as follows:
{"api_key": "your-super-secret-api-key"}
. - Next, take the code provided below and paste it into a new Python file.
import os
import re
import csv
import json
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from dataclasses import dataclass, field, fields, asdict
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Helper function to get the ScrapeOps proxy URL
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define the SearchData dataclass
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")
@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
# Define the DataPipeline class
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Main scraping function
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))
logger.info(f"Opened URL: {url}")
# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")
for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")
price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]
# Store data in SearchData and add to pipeline
search_data = SearchData(name=name, url=link, price=price, currency=currency)
data_pipeline.add_data(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")
logger.info(f"Successfully parsed data from: {url}")
success = True
except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1
driver.quit()
if not success:
raise Exception(f"Max retries exceeded for {url}")
# Function to start the scraping process
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))
script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
json_data = json.loads(script_element.get_attribute("text"))
print(json_data)
safe_filename = re.sub(r'[<>:"/|?*x00-x1F]', '_', row['name']) # Replace invalid characters with '_'
vehicle_pipeline = DataPipeline(f"{safe_filename}.csv")
#added some checks because not all ads will be same, some ads may have same keywords but not exactly about that thing, for example in this case, some ads are for parts of Ford Mustang
vehicle_data = VehicleData(
name=json_data.get("name", "No name"),
description=json_data.get("description", "No description"),
price=json_data.get("offers", {}).get("price", 0),
currency=json_data.get("offers", {}).get("priceCurrency", "No currency"),
brand=json_data.get("brand", {}).get("name", "No brand"),
model=json_data.get("model", "No model"),
year=json_data.get("vehicleModelDate", "No year"),
mileage=int(json_data.get("mileageFromOdometer", {}).get("value", 0)),
transmission=json_data.get("vehicleTransmission", "No transmission")
)
vehicle_pipeline.add_data(vehicle_data)
vehicle_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries - tries}")
finally:
driver.quit()
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
logger.info(f"Crawl starting...")
keyword_list = ["ford mustang"]
aggregate_files = []
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
To adjust your result, change any of the following:
MAX_RETRIES
: The number of maximum retry attempts for HTTP requests that fail.MAX_THREADS
: The maximum concurrent threads running during the process of scraping.PAGES
: The total number of search result pages to scrape per keyword.LOCATION
: The country code or geographic location used in the scraping process.keyword_list
: A script-performed search list of product keywords to scrape product details.
How To Architect Our Leboncoin Scraper
While scraping Leboncoin, we will stick to a structure similar to what we have used in most parts of the "How To Scrape" series.
- Our first requirement is a search crawler, which will conduct a search and save the results into a CSV file.
- Following that, a product scraper will gather and save detailed data on each car scraped during the process.
The crawler will follow these steps to be built:
- Parsing the search results.
- Managing pagination to handle result batches.
- Storing the parsed data.
- Using concurrency to process multiple search pages simultaneously.
- Incorporating proxies to bypass anti-bot systems.
We will construct the scraper using these steps:
- Parsing the product pages.
- Reading the saved data.
- Storing the newly parsed details.
- Leveraging concurrency to handle multiple products at the same time.
- Using proxy integration to bypass anti-bot systems.
Understanding How To Scrape Leboncoin
Scraping Leboncoin can be somewhat challenging. To extract the data, it’s important to first identify its location.
In the next sections, we’ll explore how to find these pages, understand their layout, and locate their data.
Additionally, we’ll cover how to manage pagination and control geolocation using the ScrapeOps Proxy Aggregator.
Step 1: How To Request Leboncoin Pages
Similar to other websites, the process begins with a GET request. When a browser accesses a site, it sends a GET request to the server and displays the page upon receiving the response.
A crawler needs to send a GET request to fetch search pages, and a scraper uses a GET request to collect product data.
For fetching search results, the crawler sends a GET request. Refer to the example URL in the screenshot below:
https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
- Here,
text=ford+mustang
specifies the search query. - In this URL structure,
text
indicates the query, andford+mustang
specifies the keyword search for "Ford Mustang."
The base URLs will follow this format:
https://www.leboncoin.fr/recherche?text={FORMATTED_KEYWORD}
The next screenshot contains a page for an individual product. The URL is:
https://www.leboncoin.fr/ad/voitures/2844784378
URLs could be reconstructed using this format:
https://www.leboncoin.fr/ad/voitures/{LISTING_ID}
However, since we’ll be scraping URLs during the crawl, reconstructing them won’t be required.
Step 2: How To Extract Data From Leboncoin Results and Pages
Each listing is enclosed in an a
element that has a data-test-id
of ad
. This is visible in the screenshot below.
Let’s now examine our product data. The product data is contained within a nested JSON blob.
Below, there are two screenshots: one without the cookie prompt and another with it.
Since the JSON blob appears on both pages, clicking the cookie button is unnecessary.
Step 3: How To Control Pagination
Refer back to the URL we discussed earlier. Pagination is straightforward; take a look:
https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
The page=2
parameter tells the Leboncoin server to display the second page of results. Our complete URLs will follow this structure:
https://www.leboncoin.fr/recherche?text=ford+mustang&page={page_number+1}
Since Python starts counting at 0, we use page_number+1
.
Step 4: Geo-located Data
Geolocation can be fully managed using the ScrapeOps Proxy API.
When communicating with ScrapeOps, a country
parameter can be included. This allows us to specify a location, and ScrapeOps routes the request through that location.
- To appear as if located in the US, set
"country": "us"
. - To appear as if located in the UK, set
"country": "uk"
.
The full list of countries is available here.
Setting Up Our Leboncoin Scraper Project
To begin, execute the commands below to set up:
Create a New Project Folder
mkdir leboncoin-scraper
cd leboncoin-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install selenium
pip install webdriver-manager
Build A Leboncoin Search Crawler
We’re now ready to create our search crawler. In the next sections, we’ll incorporate these features into it:
- Parsing
- Pagination
- Data Storage
- Concurrency
- Proxy Integration
Step 1: Create Simple Search Data Parser
To begin, we need a script that includes our basic structure. The code below accomplishes this by adding the essential components: error handling, retry logic, and a parsing function.
If you’re learning web scraping, focus closely on the parsing function, scrape_search_results()
.
import os
import json
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}"
tries = 0
success = False
# Configure Selenium WebDriver
chrome_options = Options()
cchrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Opened URL: {url}")
# Wait for the results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")
for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")
# Extract information from the card
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")
price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]
search_data = {
"name": name,
"url": link,
"price": price,
"currency": currency
}
print(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")
logger.info(f"Successfully parsed data from: {url}")
success = True
except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1
if not success:
driver.quit()
raise Exception(f"Max Retries exceeded: {retries}")
driver.quit()
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
Here’s how the data extraction process works:
- All listings are enclosed within an
a
element, which we locate usingdriver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")
. - The
card.get_attribute("href")
retrieves thehref
, which we combine with the domain name to generate a link for each listing. - We extract
p
elements usingcard.find_element(By.TAG_NAME, "p")
. - The listing names are obtained with
name_element.get_attribute("title").replace("/", "-").replace(" ", "-")
. - The price string is fetched using
card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']").text
, and string splitting is applied to extract both the price and currency.
Step 2: Add Pagination
The pagination process relies on the page
parameter. Paginated URLs follow this structure:
https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}
To crawl multiple pages, we need to implement a new function, start_scrape()
. This function uses a for
loop to scrape a specified range of pages.
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
Take a look at the code below to see how everything works together.
import os
import json
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Opened URL: {url}")
# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")
for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")
price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]
search_data = {
"name": name,
"url": link,
"price": price,
"currency": currency
}
print(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")
logger.info(f"Successfully parsed data from: {url}")
success = True
except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1
if not success:
driver.quit()
raise Exception(f"Max Retries exceeded: {retries}")
driver.quit()
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
Our paginated URLs follow this format:
https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}
The function start_scrape()
enables us to crawl across multiple pages.
Step 3: Storing the Scraped Data
The primary purpose of scraping is data storage. By saving the data, we can review it later and create programs to process it. For this, we need to store the data in a CSV file.
To achieve this, we require a dataclass
to represent the objects we want to save, as well as a DataPipeline
to store these objects and eliminate duplicates.
Below is the SearchData
class, which represents the data objects we have been extracting.
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")
Here is our DataPipeline. We use it to pipe SearchData objects into our CSV file.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
Here is the updated code:
import os
import csv
import json
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from dataclasses import dataclass, field, fields, asdict
from webdriver_manager.chrome import ChromeDriverManager
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define the SearchData dataclass
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")
# Define the DataPipeline class
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Main scraping function
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Opened URL: {url}")
# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")
for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")
price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]
# Store data in SearchData and add to pipeline
search_data = SearchData(name=name, url=link, price=price, currency=currency)
data_pipeline.add_data(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")
logger.info(f"Successfully parsed data from: {url}")
success = True
except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1
driver.quit()
if not success:
raise Exception(f"Max retries exceeded for {url}")
# Function to start the scraping process
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info("Crawl starting...")
keyword_list = ["ford mustang"]
aggregate_files = []
for keyword in keyword_list:
filename = keyword.replace(" ", "-") + ".csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info("Crawl complete.")
- Within the
main
function, we initialize a newDataPipeline
and pass it tostart_scrape()
, which then forwards it toscrape_search_results()
. - When objects are parsed, they are converted into
SearchData
and added to theDataPipeline
using theadd_data()
method. - After the crawling process is complete, we finalize by closing the pipeline with the
close_pipeline()
method.
Step 4: Adding Concurrency
Recall how we initially wrote start_scrape()
using a for
loop?
We will now enhance it for better speed and efficiency by replacing the for
loop with a more powerful tool: ThreadPoolExecutor
.
This approach allows us to run a specific function across multiple threads simultaneously.
Below is the updated start_scrape()
function.
import concurrent.futures
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
- Take note of the arguments provided to
executor.map()
. - The function
scrape_search_results
is what we want to execute on each thread, while the remaining arguments are lists that will be passed intoscrape_search_results()
as parameters.
Step 5: Bypassing Anti-Bots
Although it’s not the most robust anti-bot system we've seen in this series, Leboncoin does implement an anti-bot system that will detect and block our scraper if precautions aren’t taken.
We will create a simple function that accepts a URL as input and returns a ScrapeOps Proxied URL.
Take a look at get_scrapeops_url()
.
from urllib.parse import urlencode
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
Here is the complete code for our crawler
import os
import csv
import json
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from dataclasses import dataclass, field, fields, asdict
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Helper function to get the ScrapeOps proxy URL
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define the SearchData dataclass
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")
# Define the DataPipeline class
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Main scraping function
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))
logger.info(f"Opened URL: {url}")
# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")
for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")
price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]
# Store data in SearchData and add to pipeline
search_data = SearchData(name=name, url=link, price=price, currency=currency)
data_pipeline.add_data(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")
logger.info(f"Successfully parsed data from: {url}")
success = True
except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1
driver.quit()
if not success:
raise Exception(f"Max retries exceeded for {url}")
# Function to start the scraping process
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info("Crawl starting...")
keyword_list = ["ford mustang"]
aggregate_files = []
for keyword in keyword_list:
filename = keyword.replace(" ", "-") + ".csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info("Crawl complete.")
Make sure to replace the URL passed to the driver with the proxy URL:
driver.get(get_scrapeops_url(url, location))
Step 6: Production Run
We’ll now test the crawler in a production environment by scraping three pages of Leboncoin listings.
The number of threads will be set to 5. Although only 3 of the 5 threads will be used during the crawl, all 5 will be fully utilized later in the process.
Check out the implementation in the main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
logger.info("Crawl starting...")
keyword_list = ["ford mustang"]
aggregate_files = []
for keyword in keyword_list:
filename = keyword.replace(" ", "-") + ".csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info("Crawl complete.")
You can modify any of the following settings as needed:
keyword_list
: Holds the list of keywords to be searched and scraped.MAX_RETRIES
: Determines how many times the scraper will attempt to fetch a page after encountering an error.MAX_THREADS
: Sets the maximum number of threads available for concurrent scraping.PAGES
: Indicates the number of pages to scrape for each keyword.LOCATION
: Specifies the geographic origin of the scraping requests.
We successfully crawled 3 pages in 46.1198 seconds, averaging 15.37 seconds per page (46.1198 seconds / 3 pages).
Build A Leboncoin Scraper
Now it’s time to scrape detailed product data from Leboncoin.
In the following sections, we’ll create a scraper that uses the crawler’s CSV report to extract in-depth information about each product.
Step 1: Create Simple Product Data Parser
We’ll begin with a parsing function, incorporating error handling and retry logic as before. Check it out below.
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(url, location)
script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
json_data = json.loads(script_element.get_attribute("text"))
print(json_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries - tries}")
finally:
driver.quit()
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
- All product data is embedded inside a script tag with the type
application/ld+json
. - For now, we’re printing this data, but later, it will be stored.
Step 2: Loading URLs To Scrape
Our parsing function requires a URL to work. We’ll use the URLs saved during the crawl. Let’s write a new function similar to start_scrape()
.
Instead of processing a sequential list of pages, this function will load the URLs from our CSV file into an array and apply process_item()
to each one.
Below is the process_results()
function.
def process_results(csv_file, location, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
Take a look at the full code below.
import os
import csv
import json
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from dataclasses import dataclass, field, fields, asdict
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Helper function to get the ScrapeOps proxy URL
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define the SearchData dataclass
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")
# Define the DataPipeline class
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Main scraping function
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))
logger.info(f"Opened URL: {url}")
# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")
for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")
price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]
# Store data in SearchData and add to pipeline
search_data = SearchData(name=name, url=link, price=price, currency=currency)
data_pipeline.add_data(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")
logger.info(f"Successfully parsed data from: {url}")
success = True
except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1
driver.quit()
if not success:
raise Exception(f"Max retries exceeded for {url}")
# Function to start the scraping process
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(url)
script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
json_data = json.loads(script_element.get_attribute("text"))
print(json_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries - tries}")
finally:
driver.quit()
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_results(csv_file, location, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
keyword_list = ["ford mustang"]
aggregate_files = []
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
As before, storing our data is essential; without it, the scrape would serve no purpose. Since we already have a working DataPipeline
, all we need is an additional dataclass
. This new dataclass
will be named VehicleData
.
Check out VehicleData
below.
@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
In the updated code, we initialize a new DataPipeline
within process_item()
and pass VehicleData
into it.
import os
import re
import csv
import json
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from dataclasses import dataclass, field, fields, asdict
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Helper function to get the ScrapeOps proxy URL
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define the SearchData dataclass
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")
@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
# Define the DataPipeline class
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Main scraping function
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))
logger.info(f"Opened URL: {url}")
# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")
for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")
price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]
# Store data in SearchData and add to pipeline
search_data = SearchData(name=name, url=link, price=price, currency=currency)
data_pipeline.add_data(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")
logger.info(f"Successfully parsed data from: {url}")
success = True
except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1
driver.quit()
if not success:
raise Exception(f"Max retries exceeded for {url}")
# Function to start the scraping process
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(url)
script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
json_data = json.loads(script_element.get_attribute("text"))
print(json_data)
safe_filename = re.sub(r'[<>:"/|?*x00-x1F]', '_', row['name']) # Replace invalid characters with '_'
vehicle_pipeline = DataPipeline(f"{safe_filename}.csv")
#added some checks because not all ads will be same, some ads may have same keywords but not exactly about that thing, for example in this case, some ads are for parts of Ford Mustang
vehicle_data = VehicleData(
name=json_data.get("name", "No name"),
description=json_data.get("description", "No description"),
price=json_data.get("offers", {}).get("price", 0),
currency=json_data.get("offers", {}).get("priceCurrency", "No currency"),
brand=json_data.get("brand", {}).get("name", "No brand"),
model=json_data.get("model", "No model"),
year=json_data.get("vehicleModelDate", "No year"),
mileage=int(json_data.get("mileageFromOdometer", {}).get("value", 0)),
transmission=json_data.get("vehicleTransmission", "No transmission")
)
vehicle_pipeline.add_data(vehicle_data)
vehicle_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries - tries}")
finally:
driver.quit()
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_results(csv_file, location, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
keyword_list = ["ford mustang"]
aggregate_files = []
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
VehicleData
is utilized to represent the detailed information extracted when scraping these objects.- Similar to how we handle
SearchData
, the data is saved to a CSV file using theDataPipeline
.
Step 4: Adding Concurrency
Now it’s time to incorporate concurrency again. As before, we’ll replace the for
loop with ThreadPoolExecutor
.
Check out the code snippet below.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
- This time,
process_item
is the function we want to execute across multiple threads. - All other arguments for
process_item
are passed in as arrays, just as we did previously.
Step 5: Bypassing Anti-Bots
At this stage, bypassing anti-bots becomes straightforward. We simply need to apply get_scrapeops_url()
in another part of the code.
This time, it will be used on the response within the process_item()
function.
driver.get(get_scrapeops_url(url, location))
Below is our production-ready code.
import os
import re
import csv
import json
import logging
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from dataclasses import dataclass, field, fields, asdict
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Helper function to get the ScrapeOps proxy URL
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define the SearchData dataclass
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
setattr(self, field.name, value.strip() if value else f"No {field.name}")
@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
value = getattr(self, field.name)
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
# Define the DataPipeline class
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = self.storage_queue[:]
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
# Main scraping function
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
# Configure Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))
logger.info(f"Opened URL: {url}")
# Wait for results to load
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']"))
)
link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")
for card in link_cards:
href = card.get_attribute("href")
link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/")
try:
name_element = card.find_element(By.TAG_NAME, "p")
name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-")
price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']")
price_string = price_element.text
price = price_string[:-1]
currency = price_string[-1]
# Store data in SearchData and add to pipeline
search_data = SearchData(name=name, url=link, price=price, currency=currency)
data_pipeline.add_data(search_data)
except NoSuchElementException as e:
logger.warning(f"Failed to extract some details for a card: {e}")
logger.info(f"Successfully parsed data from: {url}")
success = True
except TimeoutException as e:
logger.error(f"Timeout occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
finally:
tries += 1
driver.quit()
if not success:
raise Exception(f"Max retries exceeded for {url}")
# Function to start the scraping process
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(get_scrapeops_url(url, location))
script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']")
json_data = json.loads(script_element.get_attribute("text"))
print(json_data)
safe_filename = re.sub(r'[<>:"/|?*x00-x1F]', '_', row['name']) # Replace invalid characters with '_'
vehicle_pipeline = DataPipeline(f"{safe_filename}.csv")
#added some checks because not all ads will be same, some ads may have same keywords but not exactly about that thing, for example in this case, some ads are for parts of Ford Mustang
vehicle_data = VehicleData(
name=json_data.get("name", "No name"),
description=json_data.get("description", "No description"),
price=json_data.get("offers", {}).get("price", 0),
currency=json_data.get("offers", {}).get("priceCurrency", "No currency"),
brand=json_data.get("brand", {}).get("name", "No brand"),
model=json_data.get("model", "No model"),
year=json_data.get("vehicleModelDate", "No year"),
mileage=int(json_data.get("mileageFromOdometer", {}).get("value", 0)),
transmission=json_data.get("vehicleTransmission", "No transmission")
)
vehicle_pipeline.add_data(vehicle_data)
vehicle_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries - tries}")
finally:
driver.quit()
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
keyword_list = ["ford mustang"]
aggregate_files = []
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
We will use the same settings as before. First, we’ll perform a crawl of 3 pages, and then we’ll scrape each result obtained from that crawl.
If you’d like a refresher, here’s our main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
logger.info(f"Crawl starting...")
keyword_list = ["ford mustang"]
aggregate_files = []
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
If you remember from earlier, our crawl took 46.1198 seconds.
On this run, we generated a CSV file with 83 results. The full run took 858.5097 seconds. 858.5097 - 46.1198 = 812.3899 seconds spent scraping.
812.3899 seconds / 83 products = 9.7878 seconds per product.
Legal and Ethical Considerations
Scraping public information is generally legal, and in this article, we focused on scraping public data.
However, scraping private data (data restricted behind a login page) falls under a different set of privacy and intellectual property laws. If you’re unsure about the legality of your scraper, it’s best to consult an attorney.
Although our scraping was legal, Leboncoin has specific Terms and Conditions as well as a robots.txt
file that outline their expectations for users.
Ignoring these policies could result in being banned from the site. You can review these policies through the links below:
NOTE: The Terms and Conditions are in French!
Conclusion
You’ve now learned how to crawl and scrape Leboncoin and experienced the proxy functionality of ScrapeOps firsthand! You’ve built iteratively and gained an understanding of key concepts such as parsing, pagination, data storage, concurrency, and proxy integration.
For more information on the technologies used in this article, explore the following resources:
More Python Web Scraping Guides
At ScrapeOps we have plenty of guides and tutorials for you to follow.
We love Python so much, we even wrote the playbook on scraping with it!
If you want to learn more from our "How To Scrape" series, check out the links below.