Skip to main content

Scrape Leboncoin With Python Requests and BeautifulSoup

How to Scrape Leboncoin With Requests and BeautifulSoup

Leboncoin has been around for nearly 20 years (founded in 2006). It's a go-to for second-hand goods, real estate, and even job offers. However, Leboncoin can be extremely difficult to scrape. On top of a strong anti-bot system, they prompt users to accept tracking cookies before viewing many of their listings, however, we can still retrieve their product data if we know where to look.

Today, we'll be scraping cars on Leboncoin but this project applies to just about anything you'd want to scrape from Leboncoin.


TLDR - How to Scrape Leboncoin

If you're looking to scrape Leboncoin but don't have time to code or read, go ahead and use our scraper below!

  1. Make a new project folder with a config.json file.
  2. Inside your config file, add your ScrapeOps API key: {"api_key": "your-super-secret-api-key"}.
  3. Then copy/paste the code below into an new Python file.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""



def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")


for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]

search_data = SearchData(
name=name,
url=link,
price=price,
currency=currency
)
data_pipeline.add_data(search_data)


logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_text = soup.select_one("script[type='application/ld+json']").text
json_data = json.loads(script_text)

vehicle_pipeline = DataPipeline(f"{row['name']}.csv")

vehicle_data = VehicleData(
name=json_data["name"],
description=json_data["description"],
price=json_data["offers"]["price"],
currency=json_data["offers"]["priceCurrency"],
brand=json_data["brand"]["name"],
model=json_data["model"],
year=json_data["vehicleModelDate"],
mileage=int(json_data["mileageFromOdometer"]["value"]),
transmission=json_data["vehicleTransmission"]
)
vehicle_pipeline.add_data(vehicle_data)
vehicle_pipeline.close_pipeline()

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

To adjust your result, change any of the following:

  • MAX_RETRIES: Maximum number of retry attempts for failed HTTP requests.
  • MAX_THREADS: Maximum number of threads that will run concurrently during the scraping process.
  • PAGES: How many pages of search results to scrape for each keyword.
  • LOCATION: The geographic location or country code for the scraping process.
  • keyword_list: A list of product keywords for which the script will perform searches and scrape product information.

How To Architect Our Leboncoin Scraper

When scraping Leboncoin, we'll follow a similar structure to most everything we've built in this "How To Scrape" series.

First, we need a search crawler. The crawler will perform a search and save our search results to a CSV file.

Next, our product scraper will retrieve and store detailed information about each of the cars we scrape during the crawl.

Our crawler will be built in the following steps:

  1. Parsing search results.
  2. Pagination to control our result batches.
  3. Data Storage for our parsed data.
  4. Concurrency to parse multiple search pages at once.
  5. Proxy Integration to bypass anti-bots.

We'll use these steps to build our scraper:

  1. Parsing product pages.
  2. Read the stored data.
  3. Store the newly parsed data.
  4. Concurrency to parse multiple products simultaneously.
  5. Proxy Integration to bypass anti-bots.

Understanding How To Scrape Leboncoin

Scraping Leboncoin can be a little bit tricky. Before extracting the data, we need to know where it is!

In the coming sections, we'll take a look at how to get these pages, how they're laid out, and where their data is located. We also need to know how to control our pagination and how to control our geolocation with the ScrapeOps Proxy Aggregator.


Step 1: How To Request Leboncoin Pages

Just like any other site, we always begin with a GET request.

  • When you visit a site with your browser, it makes a GET request to the server and displays the page after receiving the response.
  • Our crawler needs to perform a GET to retrieve our search pages.
  • Our scraper will also use a GET to retrieve product data.

Our search crawler will be performing a GET for the search results. Take a look at the URL in the screenshot below:

https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
  • text=ford+mustang holds our search query.
  • text represents the query and ford+mustang represents a keyword search for ford mustang.

Our base URLs will be laid out like this:

https://www.leboncoin.fr/recherche?text={FORMATTED_KEYWORD}

Search Results Page

This next screenshot holds an individual product page. The URL is:

https://www.leboncoin.fr/ad/voitures/2844784378

We could reconstruct URLs with the following format:

https://www.leboncoin.fr/ad/voitures/{LISTING_ID}

but we'll be scraping their URLs during our crawl, so that won't be necessary.

Product Page


Step 2: How To Extract Data From Leboncoin Results and Pages

Now, let's take a look at how to pull data from the pages we just looked at. First, we'll look at the data in the search results. Then, we'll look at our product data.

Each listing is wrapped in an a element with a data-test-id of ad. You can see this in the shot below.

Search Results Page HTML Inspection

Now, let's look at our product data. Our product data comes nested in a JSON blob. Below are two screenshots, one where we're not prompted to accept cookies and one with the cookie prompt. The JSON blob is present on both pages, so we don't need to worry about clicking the cookie button.

Search Results Page HTML Inspection

Search Results Page Cookies HTML Inspection


Step 3: How To Control Pagination

Think back to our URL from earlier. Pagination is pretty self explanatory, look the it:

https://www.leboncoin.fr/recherche?text=ford+mustang&page=2

page=2 tells the Leboncoin server that we want page 2 of the results. Our full URLs will look like this:

https://www.leboncoin.fr/recherche?text=ford+mustang&page={page_number+1}

We use page_number+1 because Python begins counting at 0.


Step 4: Geolocated Data

Our geolocation can be handled entirely through the ScrapeOps Proxy API.

When talking to ScrapeOps, we can pass a country param. This parameter allows us to set a custom location and ScrapeOps will route our request through that location.

  • If we want to appear in the US, we use the setting "country": "us".
  • If we want to appear in the UK, we can pass "country": "uk".

The full list of countries is available here.


Setting Up Our Leboncoin Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir leboncoin-scraper

cd leboncoin-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A Leboncoin Search Crawler

We're ready to build our search crawler. In the following sections, we'll build the following features in to our search crawler.

  1. Parsing
  2. Pagination
  3. Data Storage
  4. Concurrency
  5. Proxy Integration

Step 1: Create Simple Search Data Parser

To start, we need a script with our basic structure. In the code below, we're going to going to do just that. Here, we add our basic structure: error handling, retry logic and a parsing function.

If you're learning how to scrape, pay close attention to the parsing function, scrape_search_results().

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")


for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]

search_data = {
"name": name,
"url": url,
"price": price,
"currency": currency
}

print(search_data)


logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

Look at how our data gets extracted:

  • All listings are wrapped in an a element, we find them with soup.select("a[data-test-id='ad']").
  • card.get("href") gives us the href. We format this with our domain name to create a link to each listing.
  • We get our p elements with card.find_all("p").
  • p_elements[0].get("title").replace("/", "-").replace(" ", "-") gives us the name of each listing.
  • card.select_one("span[data-qa-id='aditem_price']").text gets our price_string. We use string splitting to get both the price and currency from this.

Step 2: Add Pagination

Pagination is controlled with a single parameter, page. Our paginated URLs look like this:

https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}

We also need a way to crawl a list of pages. To do this, we'll write another function, start_scrape().

Here is our new start_scrape() function. It uses a for loop to allow us to scrape a list of pages.

def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)

If you look at the code below, you'll see how it all fits together.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")


for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]

search_data = {
"name": name,
"url": url,
"price": price,
"currency": currency
}

print(search_data)


logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
  • Our paginated urls look like this: https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}.

  • start_scrape() allows us to crawl multiple pages.


Step 3: Storing the Scraped Data

Data storage is the reason we're scraping in the first place. When we store our data, we get the ability to review it later and and to also write programs that read the data. We need to store our data in a CSV file. We need a dataclass to represent the objects we want to store and we also need a DataPipeline to store these objects and filter out duplicates.

Here is our SearchData class, it represents the data objects we've been extracting.

@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is our DataPipeline. We use it to pipe SearchData objects into our CSV file.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

You can see how these work in our updated code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")


for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]

search_data = SearchData(
name=name,
url=link,
price=price,
currency=currency
)
data_pipeline.add_data(search_data)


logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • From inside of our main, we open a new DataPipeline and pass it into start_scrape() which passes it into scrape_search_results().
  • When we parse objects, we turn them into SearchData and pass them into the DataPipeline with the add_data() method.
  • Once we're finished crawling, we close the pipeline with the close_pipeline() method.

Step 4: Adding Concurrency

Remember when we wrote start_scrape() with a for loop?

Now we're going to make it faster and more efficient. Here, we'll replace that for loop with something much more powerful... ThreadPoolExecutor. This gives us the ability to call a specific function of our choice on multiple threads.

Here is our rewritten start_scrape() function.

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

Pay attention to the arguments we use with executor.map().

  • scrape_search_results is the function we want called on each thread.
  • All other args are lists of arguments to be passed into scrape_search_results().

Step 5: Bypassing Anti-Bots

While it's not the strongest anti-bot system we've encountered in this series, Leboncoin does have an anti-bot system in place and it will find and block our scraper.

Lebencoin Blocked Page

We're going to write a simple function that takes in a url and spits out a ScrapeOps Proxied url.

Checkout get_scrapeops_url().

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

With this function, our crawler is complete.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")


for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]

search_data = SearchData(
name=name,
url=link,
price=price,
currency=currency
)
data_pipeline.add_data(search_data)


logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Now, we're going to test our crawler in production. We'll scrape 3 pages of Leboncoin listings. We'll set our threads to 5. While we're only using 3 of our 5 threads on the crawl, our scrape will make full use of all 5 later on.

Take a look at our main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Feel free to change any of the following:

  • keyword_list: Contains a list of keywords to be searched and scraped.
  • MAX_RETRIES: Specifies the number of times the scraper will retry fetching a page if it encounters an error.
  • MAX_THREADS: Defines the maximum number of threads to be used for concurrent scraping.
  • PAGES: Specifies the number of pages to scrape for each keyword.
  • LOCATION: Defines the geographic location from which the scraping requests appear to originate.

Take a look at our results below.

Crawler Results Terminal

We crawled 3 pages in 23.591 seconds. 23.591 seconds / 3 pages = 7.864 seconds per page.


Build A Leboncoin Scraper

Now, it's time to scrape Leboncoin product data. In the coming sections, we're going to build a scraper that reads our crawler's CSV report and scrapes detailed information about each product.


Step 1: Create Simple Product Data Parser

Time to start with a parsing function. Like before, we'll add error handling and retry logic as well. Feel free to take a look at it below.

def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_text = soup.select_one("script[type='application/ld+json']").text
json_data = json.loads(script_text)

print(json_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
  • All of our product data gets embedded within a script tag with a type of application/ld+json.
  • At the moment, we're printing this data, but we'll be storing it later.

Step 2: Loading URLs To Scrape

To use our parsing function, it needs a url. We'll use the urls we saved during the crawl. Let's create another function similar to start_scrape().

Instead of scraping a numbered list of pages, this one will read our CSV file into an array and run process_item() on each one.

Here is our process_results() function.

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_item(row, location, retries=retries)

Take a look at the full code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")


for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]

search_data = SearchData(
name=name,
url=link,
price=price,
currency=currency
)
data_pipeline.add_data(search_data)


logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_text = soup.select_one("script[type='application/ld+json']").text
json_data = json.loads(script_text)

print(json_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_item(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

Step 3: Storing the Scraped Data

Just like before, we need to store our data. The scrape would be pointless if we didn't. We've already got a functional DataPipeline, we just need another dataclass. We're going to call this one VehicleData.

Take a look at VehicleData below.

@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""



def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In our updated code, we open a new DataPipeline from inside process_item() and pass VehicleData into it.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""



def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")


for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]

search_data = SearchData(
name=name,
url=link,
price=price,
currency=currency
)
data_pipeline.add_data(search_data)


logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_text = soup.select_one("script[type='application/ld+json']").text
json_data = json.loads(script_text)

vehicle_pipeline = DataPipeline(f"{row['name']}.csv")

vehicle_data = VehicleData(
name=json_data["name"],
description=json_data["description"],
price=json_data["offers"]["price"],
currency=json_data["offers"]["priceCurrency"],
brand=json_data["brand"]["name"],
model=json_data["model"],
year=json_data["vehicleModelDate"],
mileage=int(json_data["mileageFromOdometer"]["value"]),
transmission=json_data["vehicleTransmission"]
)
vehicle_pipeline.add_data(vehicle_data)
vehicle_pipeline.close_pipeline()

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_item(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
  • VehicleData is used to represent the detailed information we pull when scraping these objects.
  • Just like with our SearchData, we save it to a CSV file through the DataPipeline.

Step 4: Adding Concurrency

Time to add concurrency again. Like before, we'll use ThreadPoolExecutor to replace our for loop. Take a look at the snippet below.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
  • process_item is the function we wish to call on multiple threads this time.
  • All other args to process_item get passed in as arrays just like before.

Step 5: Bypassing Anti-Bots

At this point, bypassing anti-bots is super easy. We just need to use get_scrapeops_url() at another part of our code. This time we'll use it on our response from the process_item() function.

response = requests.get(get_scrapeops_url(url, location=location))

Our production ready code is available below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""



def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")


for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]

search_data = SearchData(
name=name,
url=link,
price=price,
currency=currency
)
data_pipeline.add_data(search_data)


logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
script_text = soup.select_one("script[type='application/ld+json']").text
json_data = json.loads(script_text)

vehicle_pipeline = DataPipeline(f"{row['name']}.csv")

vehicle_data = VehicleData(
name=json_data["name"],
description=json_data["description"],
price=json_data["offers"]["price"],
currency=json_data["offers"]["priceCurrency"],
brand=json_data["brand"]["name"],
model=json_data["model"],
year=json_data["vehicleModelDate"],
mileage=int(json_data["mileageFromOdometer"]["value"]),
transmission=json_data["vehicleTransmission"]
)
vehicle_pipeline.add_data(vehicle_data)
vehicle_pipeline.close_pipeline()

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

We'll run with the same settings we ran before. We're going to do a 3 page crawl and then we'll scrape each result from the crawl afterward.

If you need to see it again, here is our main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

If you remember from earlier, our crawl took 23.591 seconds. On this run, we generated a CSV file with 87 results. The full run took 558.617 seconds. 558.617 - 23.591 = 535.026 seconds spent scraping. 535.026 seconds / 87 products = 6.149 seconds per product.


Scraping public information is usually completely legal. In this article, we scraped public data.

When you scrape private data (data gated behind a login page), you are subject to an entirely different set of privacy and intellectual property laws. If you're unsure of your scraper, consult an attorney.

While our scrape was legal, Leboncoin has their own Terms and Conditions and robots.txt that they expect people to follow. Failure to respect these policies can even get you banned from the site. You can take a look at them below.

NOTE: The Terms and Conditions are in French!


Conclusion

You now know how to crawl and scrape Leboncoin. You've also seen the proxy capability of ScrapeOps firsthand! You should know what it feels like to build in iterations and you should also understand the following concepts: parsing, pagination, data storage, concurrency, and proxy integration. To learn more about the tech we used in this article, checkout the links below.


More Python Web Scraping Guides

At ScrapeOps we have plenty of guides and tutorials for you to follow. We love Python so much, we even wrote the playbook on scraping with it!

If you want to learn more from our "How To Scrape" series, check out the links below.