How to Scrape Leboncoin With Requests and BeautifulSoup
Leboncoin has been around for nearly 20 years (founded in 2006). It's a go-to for second-hand goods, real estate, and even job offers. However, Leboncoin can be extremely difficult to scrape. On top of a strong anti-bot system, they prompt users to accept tracking cookies before viewing many of their listings, however, we can still retrieve their product data if we know where to look.
Today, we'll be scraping cars on Leboncoin but this project applies to just about anything you'd want to scrape from Leboncoin.
- TLDR: How to Scrape Leboncoin
- How To Architect Our Scraper
- Understanding How To Scrape Leboncoin
- Setting Up Our Leboncoin Scraper
- Build A Leboncoin Search Crawler
- Build A Leboncoin Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Leboncoin
If you're looking to scrape Leboncoin but don't have time to code or read, go ahead and use our scraper below!
- Make a new project folder with a
config.json
file. - Inside your config file, add your ScrapeOps API key:
{"api_key": "your-super-secret-api-key"}
. - Then copy/paste the code below into an new Python file.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")
for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]
search_data = SearchData(
name=name,
url=link,
price=price,
currency=currency
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
script_text = soup.select_one("script[type='application/ld+json']").text
json_data = json.loads(script_text)
vehicle_pipeline = DataPipeline(f"{row['name']}.csv")
vehicle_data = VehicleData(
name=json_data["name"],
description=json_data["description"],
price=json_data["offers"]["price"],
currency=json_data["offers"]["priceCurrency"],
brand=json_data["brand"]["name"],
model=json_data["model"],
year=json_data["vehicleModelDate"],
mileage=int(json_data["mileageFromOdometer"]["value"]),
transmission=json_data["vehicleTransmission"]
)
vehicle_pipeline.add_data(vehicle_data)
vehicle_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
To adjust your result, change any of the following:
MAX_RETRIES
: Maximum number of retry attempts for failed HTTP requests.MAX_THREADS
: Maximum number of threads that will run concurrently during the scraping process.PAGES
: How many pages of search results to scrape for each keyword.LOCATION
: The geographic location or country code for the scraping process.keyword_list
: A list of product keywords for which the script will perform searches and scrape product information.
How To Architect Our Leboncoin Scraper
When scraping Leboncoin, we'll follow a similar structure to most everything we've built in this "How To Scrape" series.
First, we need a search crawler. The crawler will perform a search and save our search results to a CSV file.
Next, our product scraper will retrieve and store detailed information about each of the cars we scrape during the crawl.
Our crawler will be built in the following steps:
- Parsing search results.
- Pagination to control our result batches.
- Data Storage for our parsed data.
- Concurrency to parse multiple search pages at once.
- Proxy Integration to bypass anti-bots.
We'll use these steps to build our scraper:
- Parsing product pages.
- Read the stored data.
- Store the newly parsed data.
- Concurrency to parse multiple products simultaneously.
- Proxy Integration to bypass anti-bots.
Understanding How To Scrape Leboncoin
Scraping Leboncoin can be a little bit tricky. Before extracting the data, we need to know where it is!
In the coming sections, we'll take a look at how to get these pages, how they're laid out, and where their data is located. We also need to know how to control our pagination and how to control our geolocation with the ScrapeOps Proxy Aggregator.
Step 1: How To Request Leboncoin Pages
Just like any other site, we always begin with a GET request.
- When you visit a site with your browser, it makes a GET request to the server and displays the page after receiving the response.
- Our crawler needs to perform a GET to retrieve our search pages.
- Our scraper will also use a GET to retrieve product data.
Our search crawler will be performing a GET for the search results. Take a look at the URL in the screenshot below:
https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
text=ford+mustang
holds our search query.text
represents the query andford+mustang
represents a keyword search forford mustang
.
Our base URLs will be laid out like this:
https://www.leboncoin.fr/recherche?text={FORMATTED_KEYWORD}
This next screenshot holds an individual product page. The URL is:
https://www.leboncoin.fr/ad/voitures/2844784378
We could reconstruct URLs with the following format:
https://www.leboncoin.fr/ad/voitures/{LISTING_ID}
but we'll be scraping their URLs during our crawl, so that won't be necessary.
Step 2: How To Extract Data From Leboncoin Results and Pages
Now, let's take a look at how to pull data from the pages we just looked at. First, we'll look at the data in the search results. Then, we'll look at our product data.
Each listing is wrapped in an a
element with a data-test-id
of ad
. You can see this in the shot below.
Now, let's look at our product data. Our product data comes nested in a JSON blob. Below are two screenshots, one where we're not prompted to accept cookies and one with the cookie prompt. The JSON blob is present on both pages, so we don't need to worry about clicking the cookie button.
Step 3: How To Control Pagination
Think back to our URL from earlier. Pagination is pretty self explanatory, look the it:
https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
page=2
tells the Leboncoin server that we want page 2 of the results. Our full URLs will look like this:
https://www.leboncoin.fr/recherche?text=ford+mustang&page={page_number+1}
We use page_number+1
because Python begins counting at 0.
Step 4: Geolocated Data
Our geolocation can be handled entirely through the ScrapeOps Proxy API.
When talking to ScrapeOps, we can pass a country
param. This parameter allows us to set a custom location and ScrapeOps will route our request through that location.
- If we want to appear in the US, we use the setting
"country": "us"
. - If we want to appear in the UK, we can pass
"country": "uk"
.
The full list of countries is available here.
Setting Up Our Leboncoin Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir leboncoin-scraper
cd leboncoin-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A Leboncoin Search Crawler
We're ready to build our search crawler. In the following sections, we'll build the following features in to our search crawler.
- Parsing
- Pagination
- Data Storage
- Concurrency
- Proxy Integration
Step 1: Create Simple Search Data Parser
To start, we need a script with our basic structure. In the code below, we're going to going to do just that. Here, we add our basic structure: error handling, retry logic and a parsing function.
If you're learning how to scrape, pay close attention to the parsing function, scrape_search_results()
.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")
for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]
search_data = {
"name": name,
"url": url,
"price": price,
"currency": currency
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
Look at how our data gets extracted:
- All listings are wrapped in an
a
element, we find them withsoup.select("a[data-test-id='ad']")
. card.get("href")
gives us thehref
. We format this with our domain name to create a link to each listing.- We get our
p
elements withcard.find_all("p")
. p_elements[0].get("title").replace("/", "-").replace(" ", "-")
gives us the name of each listing.card.select_one("span[data-qa-id='aditem_price']").text
gets ourprice_string
. We use string splitting to get both theprice
andcurrency
from this.
Step 2: Add Pagination
Pagination is controlled with a single parameter, page
. Our paginated URLs look like this:
https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}
We also need a way to crawl a list of pages. To do this, we'll write another function, start_scrape()
.
Here is our new start_scrape()
function. It uses a for
loop to allow us to scrape a list of pages.
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
If you look at the code below, you'll see how it all fits together.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")
for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]
search_data = {
"name": name,
"url": url,
"price": price,
"currency": currency
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
-
Our paginated urls look like this:
https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}
. -
start_scrape()
allows us to crawl multiple pages.
Step 3: Storing the Scraped Data
Data storage is the reason we're scraping in the first place. When we store our data, we get the ability to review it later and and to also write programs that read the data. We need to store our data in a CSV file. We need a dataclass
to represent the objects we want to store and we also need a DataPipeline
to store these objects and filter out duplicates.
Here is our SearchData
class, it represents the data objects we've been extracting.
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is our DataPipeline
. We use it to pipe SearchData
objects into our CSV file.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
You can see how these work in our updated code below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")
for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]
search_data = SearchData(
name=name,
url=link,
price=price,
currency=currency
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- From inside of our
main
, we open a newDataPipeline
and pass it intostart_scrape()
which passes it intoscrape_search_results()
. - When we parse objects, we turn them into
SearchData
and pass them into theDataPipeline
with theadd_data()
method. - Once we're finished crawling, we close the pipeline with the
close_pipeline()
method.
Step 4: Adding Concurrency
Remember when we wrote start_scrape()
with a for
loop?
Now we're going to make it faster and more efficient. Here, we'll replace that for
loop with something much more powerful... ThreadPoolExecutor
. This gives us the ability to call a specific function of our choice on multiple threads.
Here is our rewritten start_scrape()
function.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
Pay attention to the arguments we use with executor.map()
.
scrape_search_results
is the function we want called on each thread.- All other args are lists of arguments to be passed into
scrape_search_results()
.
Step 5: Bypassing Anti-Bots
While it's not the strongest anti-bot system we've encountered in this series, Leboncoin does have an anti-bot system in place and it will find and block our scraper.
We're going to write a simple function that takes in a url and spits out a ScrapeOps Proxied url.
Checkout get_scrapeops_url()
.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
With this function, our crawler is complete.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")
for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]
search_data = SearchData(
name=name,
url=link,
price=price,
currency=currency
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Now, we're going to test our crawler in production. We'll scrape 3 pages of Leboncoin listings. We'll set our threads to 5. While we're only using 3 of our 5 threads on the crawl, our scrape will make full use of all 5 later on.
Take a look at our main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Feel free to change any of the following:
keyword_list
: Contains a list of keywords to be searched and scraped.MAX_RETRIES
: Specifies the number of times the scraper will retry fetching a page if it encounters an error.MAX_THREADS
: Defines the maximum number of threads to be used for concurrent scraping.PAGES
: Specifies the number of pages to scrape for each keyword.LOCATION
: Defines the geographic location from which the scraping requests appear to originate.
Take a look at our results below.
We crawled 3 pages in 23.591 seconds. 23.591 seconds / 3 pages = 7.864 seconds per page.
Build A Leboncoin Scraper
Now, it's time to scrape Leboncoin product data. In the coming sections, we're going to build a scraper that reads our crawler's CSV report and scrapes detailed information about each product.
Step 1: Create Simple Product Data Parser
Time to start with a parsing function. Like before, we'll add error handling and retry logic as well. Feel free to take a look at it below.
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
script_text = soup.select_one("script[type='application/ld+json']").text
json_data = json.loads(script_text)
print(json_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
- All of our product data gets embedded within a
script
tag with atype
ofapplication/ld+json
. - At the moment, we're printing this data, but we'll be storing it later.
Step 2: Loading URLs To Scrape
To use our parsing function, it needs a url. We'll use the urls we saved during the crawl. Let's create another function similar to start_scrape()
.
Instead of scraping a numbered list of pages, this one will read our CSV file into an array and run process_item()
on each one.
Here is our process_results()
function.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
Take a look at the full code below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")
for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]
search_data = SearchData(
name=name,
url=link,
price=price,
currency=currency
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
script_text = soup.select_one("script[type='application/ld+json']").text
json_data = json.loads(script_text)
print(json_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
Just like before, we need to store our data. The scrape would be pointless if we didn't. We've already got a functional DataPipeline
, we just need another dataclass
. We're going to call this one VehicleData
.
Take a look at VehicleData
below.
@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In our updated code, we open a new DataPipeline
from inside process_item()
and pass VehicleData
into it.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")
for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]
search_data = SearchData(
name=name,
url=link,
price=price,
currency=currency
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
script_text = soup.select_one("script[type='application/ld+json']").text
json_data = json.loads(script_text)
vehicle_pipeline = DataPipeline(f"{row['name']}.csv")
vehicle_data = VehicleData(
name=json_data["name"],
description=json_data["description"],
price=json_data["offers"]["price"],
currency=json_data["offers"]["priceCurrency"],
brand=json_data["brand"]["name"],
model=json_data["model"],
year=json_data["vehicleModelDate"],
mileage=int(json_data["mileageFromOdometer"]["value"]),
transmission=json_data["vehicleTransmission"]
)
vehicle_pipeline.add_data(vehicle_data)
vehicle_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_item(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
VehicleData
is used to represent the detailed information we pull when scraping these objects.- Just like with our
SearchData
, we save it to a CSV file through theDataPipeline
.
Step 4: Adding Concurrency
Time to add concurrency again. Like before, we'll use ThreadPoolExecutor
to replace our for
loop. Take a look at the snippet below.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
process_item
is the function we wish to call on multiple threads this time.- All other args to
process_item
get passed in as arrays just like before.
Step 5: Bypassing Anti-Bots
At this point, bypassing anti-bots is super easy. We just need to use get_scrapeops_url()
at another part of our code. This time we'll use it on our response from the process_item()
function.
response = requests.get(get_scrapeops_url(url, location=location))
Our production ready code is available below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
price: str = ""
currency: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class VehicleData:
name: str = ""
description: str = ""
price: int = 0
currency: str = ""
brand: str = ""
model: str = ""
year: str = ""
mileage: int = 0
transmission: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
link_cards = soup.select("a[data-test-id='ad']")
for card in link_cards:
href = card.get("href")
link = f"https://www.leboncoin.fr{href}"
p_elements = card.find_all("p")
name = p_elements[0].get("title").replace("/", "-").replace(" ", "-")
price_string = card.select_one("span[data-qa-id='aditem_price']").text
price = price_string[:-1]
currency = price_string[-1]
search_data = SearchData(
name=name,
url=link,
price=price,
currency=currency
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_item(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
script_text = soup.select_one("script[type='application/ld+json']").text
json_data = json.loads(script_text)
vehicle_pipeline = DataPipeline(f"{row['name']}.csv")
vehicle_data = VehicleData(
name=json_data["name"],
description=json_data["description"],
price=json_data["offers"]["price"],
currency=json_data["offers"]["priceCurrency"],
brand=json_data["brand"]["name"],
model=json_data["model"],
year=json_data["vehicleModelDate"],
mileage=int(json_data["mileageFromOdometer"]["value"]),
transmission=json_data["vehicleTransmission"]
)
vehicle_pipeline.add_data(vehicle_data)
vehicle_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_item,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
We'll run with the same settings we ran before. We're going to do a 3 page crawl and then we'll scrape each result from the crawl afterward.
If you need to see it again, here is our main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["ford mustang"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
If you remember from earlier, our crawl took 23.591 seconds. On this run, we generated a CSV file with 87 results. The full run took 558.617 seconds. 558.617 - 23.591 = 535.026 seconds spent scraping. 535.026 seconds / 87 products = 6.149 seconds per product.
Legal and Ethical Considerations
Scraping public information is usually completely legal. In this article, we scraped public data.
When you scrape private data (data gated behind a login page), you are subject to an entirely different set of privacy and intellectual property laws. If you're unsure of your scraper, consult an attorney.
While our scrape was legal, Leboncoin has their own Terms and Conditions and robots.txt
that they expect people to follow. Failure to respect these policies can even get you banned from the site. You can take a look at them below.
NOTE: The Terms and Conditions are in French!
Conclusion
You now know how to crawl and scrape Leboncoin. You've also seen the proxy capability of ScrapeOps firsthand! You should know what it feels like to build in iterations and you should also understand the following concepts: parsing, pagination, data storage, concurrency, and proxy integration. To learn more about the tech we used in this article, checkout the links below.
More Python Web Scraping Guides
At ScrapeOps we have plenty of guides and tutorials for you to follow. We love Python so much, we even wrote the playbook on scraping with it!
If you want to learn more from our "How To Scrape" series, check out the links below.