Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file.{"api_key": "your-super-secret-api-key"}
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = SearchData( name=name, url=link, price=price, currency=currency ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_text = soup.select_one("script[type='application/ld+json']").text json_data = json.loads(script_text) vehicle_pipeline = DataPipeline(f"{row['name']}.csv") vehicle_data = VehicleData( name=json_data["name"], description=json_data["description"], price=json_data["offers"]["price"], currency=json_data["offers"]["priceCurrency"], brand=json_data["brand"]["name"], model=json_data["model"], year=json_data["vehicleModelDate"], mileage=int(json_data["mileageFromOdometer"]["value"]), transmission=json_data["vehicleTransmission"] ) vehicle_pipeline.add_data(vehicle_data) vehicle_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: Maximum number of retry attempts for failed HTTP requests.MAX_THREADS
: Maximum number of threads that will run concurrently during the scraping process.PAGES
: How many pages of search results to scrape for each keyword.LOCATION
: The geographic location or country code for the scraping process.keyword_list
: A list of product keywords for which the script will perform searches and scrape product information.https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
text=ford+mustang
holds our search query.text
represents the query and ford+mustang
represents a keyword search for ford mustang
.https://www.leboncoin.fr/recherche?text={FORMATTED_KEYWORD}
https://www.leboncoin.fr/ad/voitures/2844784378
https://www.leboncoin.fr/ad/voitures/{LISTING_ID}
a
element with a data-test-id
of ad
. You can see this in the shot below.https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
page=2
tells the Leboncoin server that we want page 2 of the results. Our full URLs will look like this:https://www.leboncoin.fr/recherche?text=ford+mustang&page={page_number+1}
page_number+1
because Python begins counting at 0.country
param. This parameter allows us to set a custom location and ScrapeOps will route our request through that location."country": "us"
."country": "uk"
.mkdir leboncoin-scraper cd leboncoin-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
scrape_search_results()
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = { "name": name, "url": url, "price": price, "currency": currency } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
a
element, we find them with soup.select("a[data-test-id='ad']")
.card.get("href")
gives us the href
. We format this with our domain name to create a link to each listing.p
elements with card.find_all("p")
.p_elements[0].get("title").replace("/", "-").replace(" ", "-")
gives us the name of each listing.card.select_one("span[data-qa-id='aditem_price']").text
gets our price_string
. We use string splitting to get both the price
and currency
from this.page
. Our paginated URLs look like this:https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}
start_scrape()
.Here is our new start_scrape()
function. It uses a for
loop to allow us to scrape a list of pages.def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = { "name": name, "url": url, "price": price, "currency": currency } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}
.
start_scrape()
allows us to crawl multiple pages.
dataclass
to represent the objects we want to store and we also need a DataPipeline
to store these objects and filter out duplicates.Here is our SearchData
class, it represents the data objects we've been extracting.@dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. We use it to pipe SearchData
objects into our CSV file.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = SearchData( name=name, url=link, price=price, currency=currency ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
, we open a new DataPipeline
and pass it into start_scrape()
which passes it into scrape_search_results()
.SearchData
and pass them into the DataPipeline
with the add_data()
method.close_pipeline()
method.start_scrape()
with a for
loop?Now we're going to make it faster and more efficient. Here, we'll replace that for
loop with something much more powerful... ThreadPoolExecutor
. This gives us the ability to call a specific function of our choice on multiple threads.Here is our rewritten start_scrape()
function.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map()
.scrape_search_results
is the function we want called on each thread.scrape_search_results()
.get_scrapeops_url()
.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = SearchData( name=name, url=link, price=price, currency=currency ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
keyword_list
: Contains a list of keywords to be searched and scraped.MAX_RETRIES
: Specifies the number of times the scraper will retry fetching a page if it encounters an error.MAX_THREADS
: Defines the maximum number of threads to be used for concurrent scraping.PAGES
: Specifies the number of pages to scrape for each keyword.LOCATION
: Defines the geographic location from which the scraping requests appear to originate.def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_text = soup.select_one("script[type='application/ld+json']").text json_data = json.loads(script_text) print(json_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
script
tag with a type
of application/ld+json
.start_scrape()
.Instead of scraping a numbered list of pages, this one will read our CSV file into an array and run process_item()
on each one.Here is our process_results()
function.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = SearchData( name=name, url=link, price=price, currency=currency ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_text = soup.select_one("script[type='application/ld+json']").text json_data = json.loads(script_text) print(json_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline
, we just need another dataclass
. We're going to call this one VehicleData
.Take a look at VehicleData
below.@dataclassclass VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
from inside process_item()
and pass VehicleData
into it.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = SearchData( name=name, url=link, price=price, currency=currency ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_text = soup.select_one("script[type='application/ld+json']").text json_data = json.loads(script_text) vehicle_pipeline = DataPipeline(f"{row['name']}.csv") vehicle_data = VehicleData( name=json_data["name"], description=json_data["description"], price=json_data["offers"]["price"], currency=json_data["offers"]["priceCurrency"], brand=json_data["brand"]["name"], model=json_data["model"], year=json_data["vehicleModelDate"], mileage=int(json_data["mileageFromOdometer"]["value"]), transmission=json_data["vehicleTransmission"] ) vehicle_pipeline.add_data(vehicle_data) vehicle_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
VehicleData
is used to represent the detailed information we pull when scraping these objects.SearchData
, we save it to a CSV file through the DataPipeline
.ThreadPoolExecutor
to replace our for
loop. Take a look at the snippet below.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) )
process_item
is the function we wish to call on multiple threads this time.process_item
get passed in as arrays just like before.get_scrapeops_url()
at another part of our code. This time we'll use it on our response from the process_item()
function.response = requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = SearchData( name=name, url=link, price=price, currency=currency ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_text = soup.select_one("script[type='application/ld+json']").text json_data = json.loads(script_text) vehicle_pipeline = DataPipeline(f"{row['name']}.csv") vehicle_data = VehicleData( name=json_data["name"], description=json_data["description"], price=json_data["offers"]["price"], currency=json_data["offers"]["priceCurrency"], brand=json_data["brand"]["name"], model=json_data["model"], year=json_data["vehicleModelDate"], mileage=int(json_data["mileageFromOdometer"]["value"]), transmission=json_data["vehicleTransmission"] ) vehicle_pipeline.add_data(vehicle_data) vehicle_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
that they expect people to follow. Failure to respect these policies can even get you banned from the site. You can take a look at them below.NOTE: The Terms and Conditions are in French!Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file in it.{"api_key": "your-super-secret-api-key"}
.import os import re import csv import json import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from dataclasses import dataclass, field, fields, asdict from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Helper function to get the ScrapeOps proxy URL def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } return "https://proxy.scrapeops.io/v1/?" + urlencode(payload) # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the SearchData dataclass @dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}") @dataclass class VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) # Define the DataPipeline class class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Main scraping function def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] # Store data in SearchData and add to pipeline search_data = SearchData(name=name, url=link, price=price, currency=currency) data_pipeline.add_data(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 driver.quit() if not success: raise Exception(f"Max retries exceeded for {url}") # Function to start the scraping process def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") json_data = json.loads(script_element.get_attribute("text")) print(json_data) safe_filename = re.sub(r'[<>:"/|?*x00-x1F]', '_', row['name']) # Replace invalid characters with '_' vehicle_pipeline = DataPipeline(f"{safe_filename}.csv") #added some checks because not all ads will be same, some ads may have same keywords but not exactly about that thing, for example in this case, some ads are for parts of Ford Mustang vehicle_data = VehicleData( name=json_data.get("name", "No name"), description=json_data.get("description", "No description"), price=json_data.get("offers", {}).get("price", 0), currency=json_data.get("offers", {}).get("priceCurrency", "No currency"), brand=json_data.get("brand", {}).get("name", "No brand"), model=json_data.get("model", "No model"), year=json_data.get("vehicleModelDate", "No year"), mileage=int(json_data.get("mileageFromOdometer", {}).get("value", 0)), transmission=json_data.get("vehicleTransmission", "No transmission") ) vehicle_pipeline.add_data(vehicle_data) vehicle_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries - tries}") finally: driver.quit() tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: The number of maximum retry attempts for HTTP requests that fail.MAX_THREADS
: The maximum concurrent threads running during the process of scraping.PAGES
: The total number of search result pages to scrape per keyword.LOCATION
: The country code or geographic location used in the scraping process.keyword_list
: A script-performed search list of product keywords to scrape product details.https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
text=ford+mustang
specifies the search query.text
indicates the query, and ford+mustang
specifies the keyword search for "Ford Mustang."https://www.leboncoin.fr/recherche?text={FORMATTED_KEYWORD}
https://www.leboncoin.fr/ad/voitures/2844784378
https://www.leboncoin.fr/ad/voitures/{LISTING_ID}
a
element that has a data-test-id
of ad
. This is visible in the screenshot below.https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
page=2
parameter tells the Leboncoin server to display the second page of results. Our complete URLs will follow this structure:https://www.leboncoin.fr/recherche?text=ford+mustang&page={page_number+1}
page_number+1
.country
parameter can be included. This allows us to specify a location, and ScrapeOps routes the request through that location."country": "us"
."country": "uk"
.mkdir leboncoin-scraper cd leboncoin-scraper
python -m venv venv
source venv/bin/activate
pip install seleniumpip install webdriver-manager
scrape_search_results()
.import os import json import logging from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from webdriver_manager.chrome import ChromeDriverManager API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() cchrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Opened URL: {url}") # Wait for the results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") # Extract information from the card try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] search_data = { "name": name, "url": link, "price": price, "currency": currency } print(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 if not success: driver.quit() raise Exception(f"Max Retries exceeded: {retries}") driver.quit() if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] # Job Processes for keyword in keyword_list: scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
a
element, which we locate using driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']")
.card.get_attribute("href")
retrieves the href
, which we combine with the domain name to generate a link for each listing.p
elements using card.find_element(By.TAG_NAME, "p")
.name_element.get_attribute("title").replace("/", "-").replace(" ", "-")
.card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']").text
, and string splitting is applied to extract both the price and currency.page
parameter. Paginated URLs follow this structure:https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}
start_scrape()
. This function uses a for
loop to scrape a specified range of pages.def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import os import json import logging from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from webdriver_manager.chrome import ChromeDriverManager API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] search_data = { "name": name, "url": link, "price": price, "currency": currency } print(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 if not success: driver.quit() raise Exception(f"Max Retries exceeded: {retries}") driver.quit() def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] # Job Processes for keyword in keyword_list: start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}
start_scrape()
enables us to crawl across multiple pages.dataclass
to represent the objects we want to save, as well as a DataPipeline
to store these objects and eliminate duplicates.Below is the SearchData
class, which represents the data objects we have been extracting.@dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}")
class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv()
import os import csv import json import logging from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from dataclasses import dataclass, field, fields, asdict from webdriver_manager.chrome import ChromeDriverManager API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the SearchData dataclass @dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}") # Define the DataPipeline class class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Main scraping function def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] # Store data in SearchData and add to pipeline search_data = SearchData(name=name, url=link, price=price, currency=currency) data_pipeline.add_data(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 driver.quit() if not success: raise Exception(f"Max retries exceeded for {url}") # Function to start the scraping process def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info("Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") + ".csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info("Crawl complete.")
main
function, we initialize a new DataPipeline
and pass it to start_scrape()
, which then forwards it to scrape_search_results()
.SearchData
and added to the DataPipeline
using the add_data()
method.close_pipeline()
method.start_scrape()
using a for
loop?We will now enhance it for better speed and efficiency by replacing the for
loop with a more powerful tool: ThreadPoolExecutor
.This approach allows us to run a specific function across multiple threads simultaneously.Below is the updated start_scrape()
function.import concurrent.futures def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map()
.scrape_search_results
is what we want to execute on each thread, while the remaining arguments are lists that will be passed into scrape_search_results()
as parameters.get_scrapeops_url()
.from urllib.parse import urlencode def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
import os import csv import json import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from dataclasses import dataclass, field, fields, asdict from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Helper function to get the ScrapeOps proxy URL def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } return "https://proxy.scrapeops.io/v1/?" + urlencode(payload) # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the SearchData dataclass @dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}") # Define the DataPipeline class class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Main scraping function def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] # Store data in SearchData and add to pipeline search_data = SearchData(name=name, url=link, price=price, currency=currency) data_pipeline.add_data(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 driver.quit() if not success: raise Exception(f"Max retries exceeded for {url}") # Function to start the scraping process def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info("Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") + ".csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info("Crawl complete.")
driver.get(get_scrapeops_url(url, location))
main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info("Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") + ".csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info("Crawl complete.")
keyword_list
: Holds the list of keywords to be searched and scraped.MAX_RETRIES
: Determines how many times the scraper will attempt to fetch a page after encountering an error.MAX_THREADS
: Sets the maximum number of threads available for concurrent scraping.PAGES
: Indicates the number of pages to scrape for each keyword.LOCATION
: Specifies the geographic origin of the scraping requests.def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url, location) script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") json_data = json.loads(script_element.get_attribute("text")) print(json_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries - tries}") finally: driver.quit() tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}")
application/ld+json
.start_scrape()
.Instead of processing a sequential list of pages, this function will load the URLs from our CSV file into an array and apply process_item()
to each one.Below is the process_results()
function.def process_results(csv_file, location, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries)
import os import csv import json import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from dataclasses import dataclass, field, fields, asdict from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Helper function to get the ScrapeOps proxy URL def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } return "https://proxy.scrapeops.io/v1/?" + urlencode(payload) # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the SearchData dataclass @dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}") # Define the DataPipeline class class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Main scraping function def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] # Store data in SearchData and add to pipeline search_data = SearchData(name=name, url=link, price=price, currency=currency) data_pipeline.add_data(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 driver.quit() if not success: raise Exception(f"Max retries exceeded for {url}") # Function to start the scraping process def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") json_data = json.loads(script_element.get_attribute("text")) print(json_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries - tries}") finally: driver.quit() tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline
, all we need is an additional dataclass
. This new dataclass
will be named VehicleData
.Check out VehicleData
below.@dataclass class VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip())
DataPipeline
within process_item()
and pass VehicleData
into it.import os import re import csv import json import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from dataclasses import dataclass, field, fields, asdict from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Helper function to get the ScrapeOps proxy URL def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } return "https://proxy.scrapeops.io/v1/?" + urlencode(payload) # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the SearchData dataclass @dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}") @dataclass class VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) # Define the DataPipeline class class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Main scraping function def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] # Store data in SearchData and add to pipeline search_data = SearchData(name=name, url=link, price=price, currency=currency) data_pipeline.add_data(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 driver.quit() if not success: raise Exception(f"Max retries exceeded for {url}") # Function to start the scraping process def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") json_data = json.loads(script_element.get_attribute("text")) print(json_data) safe_filename = re.sub(r'[<>:"/|?*x00-x1F]', '_', row['name']) # Replace invalid characters with '_' vehicle_pipeline = DataPipeline(f"{safe_filename}.csv") #added some checks because not all ads will be same, some ads may have same keywords but not exactly about that thing, for example in this case, some ads are for parts of Ford Mustang vehicle_data = VehicleData( name=json_data.get("name", "No name"), description=json_data.get("description", "No description"), price=json_data.get("offers", {}).get("price", 0), currency=json_data.get("offers", {}).get("priceCurrency", "No currency"), brand=json_data.get("brand", {}).get("name", "No brand"), model=json_data.get("model", "No model"), year=json_data.get("vehicleModelDate", "No year"), mileage=int(json_data.get("mileageFromOdometer", {}).get("value", 0)), transmission=json_data.get("vehicleTransmission", "No transmission") ) vehicle_pipeline.add_data(vehicle_data) vehicle_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries - tries}") finally: driver.quit() tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
VehicleData
is utilized to represent the detailed information extracted when scraping these objects.SearchData
, the data is saved to a CSV file using the DataPipeline
.for
loop with ThreadPoolExecutor
.Check out the code snippet below.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) )
process_item
is the function we want to execute across multiple threads.process_item
are passed in as arrays, just as we did previously.get_scrapeops_url()
in another part of the code.This time, it will be used on the response within the process_item()
function.driver.get(get_scrapeops_url(url, location))
import os import re import csv import json import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from dataclasses import dataclass, field, fields, asdict from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Helper function to get the ScrapeOps proxy URL def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } return "https://proxy.scrapeops.io/v1/?" + urlencode(payload) # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the SearchData dataclass @dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}") @dataclass class VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) # Define the DataPipeline class class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Main scraping function def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] # Store data in SearchData and add to pipeline search_data = SearchData(name=name, url=link, price=price, currency=currency) data_pipeline.add_data(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 driver.quit() if not success: raise Exception(f"Max retries exceeded for {url}") # Function to start the scraping process def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") json_data = json.loads(script_element.get_attribute("text")) print(json_data) safe_filename = re.sub(r'[<>:"/|?*x00-x1F]', '_', row['name']) # Replace invalid characters with '_' vehicle_pipeline = DataPipeline(f"{safe_filename}.csv") #added some checks because not all ads will be same, some ads may have same keywords but not exactly about that thing, for example in this case, some ads are for parts of Ford Mustang vehicle_data = VehicleData( name=json_data.get("name", "No name"), description=json_data.get("description", "No description"), price=json_data.get("offers", {}).get("price", 0), currency=json_data.get("offers", {}).get("priceCurrency", "No currency"), brand=json_data.get("brand", {}).get("name", "No brand"), model=json_data.get("model", "No model"), year=json_data.get("vehicleModelDate", "No year"), mileage=int(json_data.get("mileageFromOdometer", {}).get("value", 0)), transmission=json_data.get("vehicleTransmission", "No transmission") ) vehicle_pipeline.add_data(vehicle_data) vehicle_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries - tries}") finally: driver.quit() tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
file that outline their expectations for users.Ignoring these policies could result in being banned from the site. You can review these policies through the links below:NOTE: The Terms and Conditions are in French!