Then check out ScrapeOps, the complete toolkit for web scraping.
config.json file.{"api_key": "your-super-secret-api-key"}.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = SearchData( name=name, url=link, price=price, currency=currency ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_text = soup.select_one("script[type='application/ld+json']").text json_data = json.loads(script_text) vehicle_pipeline = DataPipeline(f"{row['name']}.csv") vehicle_data = VehicleData( name=json_data["name"], description=json_data["description"], price=json_data["offers"]["price"], currency=json_data["offers"]["priceCurrency"], brand=json_data["brand"]["name"], model=json_data["model"], year=json_data["vehicleModelDate"], mileage=int(json_data["mileageFromOdometer"]["value"]), transmission=json_data["vehicleTransmission"] ) vehicle_pipeline.add_data(vehicle_data) vehicle_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES: Maximum number of retry attempts for failed HTTP requests.MAX_THREADS: Maximum number of threads that will run concurrently during the scraping process.PAGES: How many pages of search results to scrape for each keyword.LOCATION: The geographic location or country code for the scraping process.keyword_list: A list of product keywords for which the script will perform searches and scrape product information.https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
text=ford+mustang holds our search query.text represents the query and ford+mustang represents a keyword search for ford mustang.https://www.leboncoin.fr/recherche?text={FORMATTED_KEYWORD}
https://www.leboncoin.fr/ad/voitures/2844784378
https://www.leboncoin.fr/ad/voitures/{LISTING_ID}
a element with a data-test-id of ad. You can see this in the shot below.Now, let's look at our product data. Our product data comes nested in a JSON blob. Below are two screenshots, one where we're not prompted to accept cookies and one with the cookie prompt. The JSON blob is present on both pages, so we don't need to worry about clicking the cookie button.https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
page=2 tells the Leboncoin server that we want page 2 of the results. Our full URLs will look like this:https://www.leboncoin.fr/recherche?text=ford+mustang&page={page_number+1}
page_number+1 because Python begins counting at 0.country param. This parameter allows us to set a custom location and ScrapeOps will route our request through that location."country": "us"."country": "uk".mkdir leboncoin-scraper cd leboncoin-scraper
python -m venv venvsource venv/bin/activatepip install requestspip install beautifulsoup4scrape_search_results().import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = { "name": name, "url": url, "price": price, "currency": currency } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
a element, we find them with soup.select("a[data-test-id='ad']").card.get("href") gives us the href. We format this with our domain name to create a link to each listing.p elements with card.find_all("p").p_elements[0].get("title").replace("/", "-").replace(" ", "-") gives us the name of each listing.card.select_one("span[data-qa-id='aditem_price']").text gets our price_string. We use string splitting to get both the price and currency from this.page. Our paginated URLs look like this:https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}
start_scrape().Here is our new start_scrape() function. It uses a for loop to allow us to scrape a list of pages.def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = { "name": name, "url": url, "price": price, "currency": currency } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}.
start_scrape() allows us to crawl multiple pages.
dataclass to represent the objects we want to store and we also need a DataPipeline to store these objects and filter out duplicates.Here is our SearchData class, it represents the data objects we've been extracting.@dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline. We use it to pipe SearchData objects into our CSV file.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = SearchData( name=name, url=link, price=price, currency=currency ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main, we open a new DataPipeline and pass it into start_scrape() which passes it into scrape_search_results().SearchData and pass them into the DataPipeline with the add_data() method.close_pipeline() method.start_scrape() with a for loop?Now we're going to make it faster and more efficient. Here, we'll replace that for loop with something much more powerful... ThreadPoolExecutor. This gives us the ability to call a specific function of our choice on multiple threads.Here is our rewritten start_scrape() function.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map().scrape_search_results is the function we want called on each thread.scrape_search_results().get_scrapeops_url().def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = SearchData( name=name, url=link, price=price, currency=currency ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
keyword_list: Contains a list of keywords to be searched and scraped.MAX_RETRIES: Specifies the number of times the scraper will retry fetching a page if it encounters an error.MAX_THREADS: Defines the maximum number of threads to be used for concurrent scraping.PAGES: Specifies the number of pages to scrape for each keyword.LOCATION: Defines the geographic location from which the scraping requests appear to originate.def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_text = soup.select_one("script[type='application/ld+json']").text json_data = json.loads(script_text) print(json_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
script tag with a type of application/ld+json.start_scrape().Instead of scraping a numbered list of pages, this one will read our CSV file into an array and run process_item() on each one.Here is our process_results() function.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = SearchData( name=name, url=link, price=price, currency=currency ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_text = soup.select_one("script[type='application/ld+json']").text json_data = json.loads(script_text) print(json_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline, we just need another dataclass. We're going to call this one VehicleData.Take a look at VehicleData below.@dataclassclass VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline from inside process_item() and pass VehicleData into it.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = SearchData( name=name, url=link, price=price, currency=currency ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_text = soup.select_one("script[type='application/ld+json']").text json_data = json.loads(script_text) vehicle_pipeline = DataPipeline(f"{row['name']}.csv") vehicle_data = VehicleData( name=json_data["name"], description=json_data["description"], price=json_data["offers"]["price"], currency=json_data["offers"]["priceCurrency"], brand=json_data["brand"]["name"], model=json_data["model"], year=json_data["vehicleModelDate"], mileage=int(json_data["mileageFromOdometer"]["value"]), transmission=json_data["vehicleTransmission"] ) vehicle_pipeline.add_data(vehicle_data) vehicle_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
VehicleData is used to represent the detailed information we pull when scraping these objects.SearchData, we save it to a CSV file through the DataPipeline.ThreadPoolExecutor to replace our for loop. Take a look at the snippet below.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) )
process_item is the function we wish to call on multiple threads this time.process_item get passed in as arrays just like before.get_scrapeops_url() at another part of our code. This time we'll use it on our response from the process_item() function.response = requests.get(get_scrapeops_url(url, location=location))import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") link_cards = soup.select("a[data-test-id='ad']") for card in link_cards: href = card.get("href") link = f"https://www.leboncoin.fr{href}" p_elements = card.find_all("p") name = p_elements[0].get("title").replace("/", "-").replace(" ", "-") price_string = card.select_one("span[data-qa-id='aditem_price']").text price = price_string[:-1] currency = price_string[-1] search_data = SearchData( name=name, url=link, price=price, currency=currency ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_text = soup.select_one("script[type='application/ld+json']").text json_data = json.loads(script_text) vehicle_pipeline = DataPipeline(f"{row['name']}.csv") vehicle_data = VehicleData( name=json_data["name"], description=json_data["description"], price=json_data["offers"]["price"], currency=json_data["offers"]["priceCurrency"], brand=json_data["brand"]["name"], model=json_data["model"], year=json_data["vehicleModelDate"], mileage=int(json_data["mileageFromOdometer"]["value"]), transmission=json_data["vehicleTransmission"] ) vehicle_pipeline.add_data(vehicle_data) vehicle_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt that they expect people to follow. Failure to respect these policies can even get you banned from the site. You can take a look at them below.NOTE: The Terms and Conditions are in French!Then check out ScrapeOps, the complete toolkit for web scraping.
config.json file in it.{"api_key": "your-super-secret-api-key"}.import os import re import csv import json import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from dataclasses import dataclass, field, fields, asdict from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Helper function to get the ScrapeOps proxy URL def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } return "https://proxy.scrapeops.io/v1/?" + urlencode(payload) # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the SearchData dataclass @dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}") @dataclass class VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) # Define the DataPipeline class class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Main scraping function def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] # Store data in SearchData and add to pipeline search_data = SearchData(name=name, url=link, price=price, currency=currency) data_pipeline.add_data(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 driver.quit() if not success: raise Exception(f"Max retries exceeded for {url}") # Function to start the scraping process def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") json_data = json.loads(script_element.get_attribute("text")) print(json_data) safe_filename = re.sub(r'[<>:"/|?*x00-x1F]', '_', row['name']) # Replace invalid characters with '_' vehicle_pipeline = DataPipeline(f"{safe_filename}.csv") #added some checks because not all ads will be same, some ads may have same keywords but not exactly about that thing, for example in this case, some ads are for parts of Ford Mustang vehicle_data = VehicleData( name=json_data.get("name", "No name"), description=json_data.get("description", "No description"), price=json_data.get("offers", {}).get("price", 0), currency=json_data.get("offers", {}).get("priceCurrency", "No currency"), brand=json_data.get("brand", {}).get("name", "No brand"), model=json_data.get("model", "No model"), year=json_data.get("vehicleModelDate", "No year"), mileage=int(json_data.get("mileageFromOdometer", {}).get("value", 0)), transmission=json_data.get("vehicleTransmission", "No transmission") ) vehicle_pipeline.add_data(vehicle_data) vehicle_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries - tries}") finally: driver.quit() tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES: The number of maximum retry attempts for HTTP requests that fail.MAX_THREADS: The maximum concurrent threads running during the process of scraping.PAGES: The total number of search result pages to scrape per keyword.LOCATION: The country code or geographic location used in the scraping process.keyword_list: A script-performed search list of product keywords to scrape product details.https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
text=ford+mustang specifies the search query.text indicates the query, and ford+mustang specifies the keyword search for "Ford Mustang."https://www.leboncoin.fr/recherche?text={FORMATTED_KEYWORD}
https://www.leboncoin.fr/ad/voitures/2844784378
https://www.leboncoin.fr/ad/voitures/{LISTING_ID}
a element that has a data-test-id of ad. This is visible in the screenshot below.Let’s now examine our product data. The product data is contained within a nested JSON blob.Below, there are two screenshots: one without the cookie prompt and another with it.Since the JSON blob appears on both pages, clicking the cookie button is unnecessary.https://www.leboncoin.fr/recherche?text=ford+mustang&page=2
page=2 parameter tells the Leboncoin server to display the second page of results. Our complete URLs will follow this structure:https://www.leboncoin.fr/recherche?text=ford+mustang&page={page_number+1}
page_number+1.country parameter can be included. This allows us to specify a location, and ScrapeOps routes the request through that location."country": "us"."country": "uk".mkdir leboncoin-scraper cd leboncoin-scraper
python -m venv venvsource venv/bin/activatepip install seleniumpip install webdriver-manager
scrape_search_results().import os import json import logging from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from webdriver_manager.chrome import ChromeDriverManager API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() cchrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Opened URL: {url}") # Wait for the results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") # Extract information from the card try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] search_data = { "name": name, "url": link, "price": price, "currency": currency } print(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 if not success: driver.quit() raise Exception(f"Max Retries exceeded: {retries}") driver.quit() if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] # Job Processes for keyword in keyword_list: scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
a element, which we locate using driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']").card.get_attribute("href") retrieves the href, which we combine with the domain name to generate a link for each listing.p elements using card.find_element(By.TAG_NAME, "p").name_element.get_attribute("title").replace("/", "-").replace(" ", "-").card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']").text, and string splitting is applied to extract both the price and currency.page parameter. Paginated URLs follow this structure:https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}
start_scrape(). This function uses a for loop to scrape a specified range of pages.def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import os import json import logging from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from webdriver_manager.chrome import ChromeDriverManager API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] search_data = { "name": name, "url": link, "price": price, "currency": currency } print(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 if not success: driver.quit() raise Exception(f"Max Retries exceeded: {retries}") driver.quit() def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["ford mustang"] aggregate_files = [] # Job Processes for keyword in keyword_list: start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}
start_scrape() enables us to crawl across multiple pages.dataclass to represent the objects we want to save, as well as a DataPipeline to store these objects and eliminate duplicates.Below is the SearchData class, which represents the data objects we have been extracting.@dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}")
class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv()
import os import csv import json import logging from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from dataclasses import dataclass, field, fields, asdict from webdriver_manager.chrome import ChromeDriverManager API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the SearchData dataclass @dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}") # Define the DataPipeline class class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Main scraping function def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] # Store data in SearchData and add to pipeline search_data = SearchData(name=name, url=link, price=price, currency=currency) data_pipeline.add_data(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 driver.quit() if not success: raise Exception(f"Max retries exceeded for {url}") # Function to start the scraping process def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info("Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") + ".csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info("Crawl complete.")
main function, we initialize a new DataPipeline and pass it to start_scrape(), which then forwards it to scrape_search_results().SearchData and added to the DataPipeline using the add_data() method.close_pipeline() method.start_scrape() using a for loop?We will now enhance it for better speed and efficiency by replacing the for loop with a more powerful tool: ThreadPoolExecutor.This approach allows us to run a specific function across multiple threads simultaneously.Below is the updated start_scrape() function.import concurrent.futures def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map().scrape_search_results is what we want to execute on each thread, while the remaining arguments are lists that will be passed into scrape_search_results() as parameters.get_scrapeops_url().from urllib.parse import urlencode def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } return "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
import os import csv import json import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from dataclasses import dataclass, field, fields, asdict from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Helper function to get the ScrapeOps proxy URL def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } return "https://proxy.scrapeops.io/v1/?" + urlencode(payload) # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the SearchData dataclass @dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}") # Define the DataPipeline class class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Main scraping function def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] # Store data in SearchData and add to pipeline search_data = SearchData(name=name, url=link, price=price, currency=currency) data_pipeline.add_data(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 driver.quit() if not success: raise Exception(f"Max retries exceeded for {url}") # Function to start the scraping process def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info("Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") + ".csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info("Crawl complete.")
driver.get(get_scrapeops_url(url, location))main.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info("Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") + ".csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info("Crawl complete.")
keyword_list: Holds the list of keywords to be searched and scraped.MAX_RETRIES: Determines how many times the scraper will attempt to fetch a page after encountering an error.MAX_THREADS: Sets the maximum number of threads available for concurrent scraping.PAGES: Indicates the number of pages to scrape for each keyword.LOCATION: Specifies the geographic origin of the scraping requests.def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url, location) script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") json_data = json.loads(script_element.get_attribute("text")) print(json_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries - tries}") finally: driver.quit() tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}")
application/ld+json.start_scrape().Instead of processing a sequential list of pages, this function will load the URLs from our CSV file into an array and apply process_item() to each one.Below is the process_results() function.def process_results(csv_file, location, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries)
import os import csv import json import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from dataclasses import dataclass, field, fields, asdict from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Helper function to get the ScrapeOps proxy URL def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } return "https://proxy.scrapeops.io/v1/?" + urlencode(payload) # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the SearchData dataclass @dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}") # Define the DataPipeline class class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Main scraping function def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] # Store data in SearchData and add to pipeline search_data = SearchData(name=name, url=link, price=price, currency=currency) data_pipeline.add_data(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 driver.quit() if not success: raise Exception(f"Max retries exceeded for {url}") # Function to start the scraping process def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") json_data = json.loads(script_element.get_attribute("text")) print(json_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries - tries}") finally: driver.quit() tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline, all we need is an additional dataclass. This new dataclass will be named VehicleData.Check out VehicleData below.@dataclass class VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip())
DataPipeline within process_item() and pass VehicleData into it.import os import re import csv import json import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from dataclasses import dataclass, field, fields, asdict from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Helper function to get the ScrapeOps proxy URL def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } return "https://proxy.scrapeops.io/v1/?" + urlencode(payload) # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the SearchData dataclass @dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}") @dataclass class VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) # Define the DataPipeline class class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Main scraping function def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] # Store data in SearchData and add to pipeline search_data = SearchData(name=name, url=link, price=price, currency=currency) data_pipeline.add_data(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 driver.quit() if not success: raise Exception(f"Max retries exceeded for {url}") # Function to start the scraping process def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") json_data = json.loads(script_element.get_attribute("text")) print(json_data) safe_filename = re.sub(r'[<>:"/|?*x00-x1F]', '_', row['name']) # Replace invalid characters with '_' vehicle_pipeline = DataPipeline(f"{safe_filename}.csv") #added some checks because not all ads will be same, some ads may have same keywords but not exactly about that thing, for example in this case, some ads are for parts of Ford Mustang vehicle_data = VehicleData( name=json_data.get("name", "No name"), description=json_data.get("description", "No description"), price=json_data.get("offers", {}).get("price", 0), currency=json_data.get("offers", {}).get("priceCurrency", "No currency"), brand=json_data.get("brand", {}).get("name", "No brand"), model=json_data.get("model", "No model"), year=json_data.get("vehicleModelDate", "No year"), mileage=int(json_data.get("mileageFromOdometer", {}).get("value", 0)), transmission=json_data.get("vehicleTransmission", "No transmission") ) vehicle_pipeline.add_data(vehicle_data) vehicle_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries - tries}") finally: driver.quit() tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_item(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
VehicleData is utilized to represent the detailed information extracted when scraping these objects.SearchData, the data is saved to a CSV file using the DataPipeline.for loop with ThreadPoolExecutor.Check out the code snippet below.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) )
process_item is the function we want to execute across multiple threads.process_item are passed in as arrays, just as we did previously.get_scrapeops_url() in another part of the code.This time, it will be used on the response within the process_item() function.driver.get(get_scrapeops_url(url, location))import os import re import csv import json import logging from urllib.parse import urlencode from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from dataclasses import dataclass, field, fields, asdict from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Helper function to get the ScrapeOps proxy URL def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } return "https://proxy.scrapeops.io/v1/?" + urlencode(payload) # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the SearchData dataclass @dataclass class SearchData: name: str = "" url: str = "" price: str = "" currency: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) setattr(self, field.name, value.strip() if value else f"No {field.name}") @dataclass class VehicleData: name: str = "" description: str = "" price: int = 0 currency: str = "" brand: str = "" model: str = "" year: str = "" mileage: int = 0 transmission: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): value = getattr(self, field.name) if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) # Define the DataPipeline class class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = self.storage_queue[:] self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() # Main scraping function def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.leboncoin.fr/recherche?text={formatted_keyword}&page={page_number+1}" tries = 0 success = False # Configure Selenium WebDriver chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) logger.info(f"Opened URL: {url}") # Wait for results to load WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[data-test-id='ad']")) ) link_cards = driver.find_elements(By.CSS_SELECTOR, "a[data-test-id='ad']") for card in link_cards: href = card.get_attribute("href") link = href.replace("https://proxy.scrapeops.io/", "https://www.leboncoin.fr/") try: name_element = card.find_element(By.TAG_NAME, "p") name = name_element.get_attribute("title").replace("/", "-").replace(" ", "-") price_element = card.find_element(By.CSS_SELECTOR, "span[data-qa-id='aditem_price']") price_string = price_element.text price = price_string[:-1] currency = price_string[-1] # Store data in SearchData and add to pipeline search_data = SearchData(name=name, url=link, price=price, currency=currency) data_pipeline.add_data(search_data) except NoSuchElementException as e: logger.warning(f"Failed to extract some details for a card: {e}") logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException as e: logger.error(f"Timeout occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") finally: tries += 1 driver.quit() if not success: raise Exception(f"Max retries exceeded for {url}") # Function to start the scraping process def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_item(row, location, retries=3): url = row["url"] tries = 0 success = False chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--window-size=1920,1080") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(get_scrapeops_url(url, location)) script_element = driver.find_element(By.CSS_SELECTOR, "script[type='application/ld+json']") json_data = json.loads(script_element.get_attribute("text")) print(json_data) safe_filename = re.sub(r'[<>:"/|?*x00-x1F]', '_', row['name']) # Replace invalid characters with '_' vehicle_pipeline = DataPipeline(f"{safe_filename}.csv") #added some checks because not all ads will be same, some ads may have same keywords but not exactly about that thing, for example in this case, some ads are for parts of Ford Mustang vehicle_data = VehicleData( name=json_data.get("name", "No name"), description=json_data.get("description", "No description"), price=json_data.get("offers", {}).get("price", 0), currency=json_data.get("offers", {}).get("priceCurrency", "No currency"), brand=json_data.get("brand", {}).get("name", "No brand"), model=json_data.get("model", "No model"), year=json_data.get("vehicleModelDate", "No year"), mileage=int(json_data.get("mileageFromOdometer", {}).get("value", 0)), transmission=json_data.get("vehicleTransmission", "No transmission") ) vehicle_pipeline.add_data(vehicle_data) vehicle_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries - tries}") finally: driver.quit() tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_item, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["ford mustang"] aggregate_files = [] for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt file that outline their expectations for users.Ignoring these policies could result in being banned from the site. You can review these policies through the links below:NOTE: The Terms and Conditions are in French!