Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file.{"api_key": "your-super-secret-api-key"}
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "render_js": True, "bypass": "generic_level_3", "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: str = "" size: str = "" date_available: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CostData: name: str = "" cold_rent: str = "" price_per_m2: str = "" additional_costs: str = "" total_cost: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" url = "" if page_number != 0: url = f"{base_url}?pagenumber={page_number+1}" else: url = base_url tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = SearchData( name=name, price=price, size=size, date_available=date_available, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv") cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip() price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\ .replace("Kalkuliert von ImmoScout24", "").strip() additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip() heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip() total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip() cost_data = CostData( name=row["name"], cold_rent=cold_rent, price_per_m2=price_per_m2, additional_costs=additional_costs, total_cost=total_cost ) costs_pipeline.add_data(cost_data) costs_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_THREADS
: Controls the number of threads that the program will use for concurrent execution.MAX_RETRIES
: Defines the number of times the scraper will retry a failed request before giving up.PAGES
: Determines how many pages of Google search results to scrape for each keyword.LOCATION
: Specifies the geographical location (country) for the Google search.keyword_list
: This is a list of keywords for which the script will perform the search and subsequent scraping.COST-{name-of-property}
.https://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mieten
bayern
(Bavaria) is our state, and muenchen
(Munich) is the city we're searching.https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten
div
with the class result-list-entry__data
. If we find all of these div
items, we can go through and pull all of the data from these items.dd
elements on the page.https://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mieten
https://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mieten?pagenumber=2
?pagenumber=2
. This is something unique for Immobilienscout24, passing ?pagenumber=1
actually trips their anti-bot system.Take a look at the shot below.pagenumber
parameter. For all other pages, we need to include it.country
param to be routed through the country of our choice.In this case, we want to appear in Germany, so we'll pass "country": "de"
.You can view all of our available country codes here.mkdir immobilienscout24-scraper cd immobilienscout24-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
main
block.All of this is important, but if you're here to learn scraping, the parsing function is by far the most important.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(search_info, location, retries=3): url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = { "name": name, "price": price, "size": size, "date_available": date_available, "url": link } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] ## Job Processes for keyword in keyword_list: scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
card.find("div", class_="result-list-entry__address font-ellipsis").text
gives us the address or name
of each property.href
with card.find("a").get("href")
.href
includes our prefix
: "https://www.immobilienscout24.de"
. If it does, this is an add so we skip it with continue
.card.select_one("div[data-is24-qa='attributes']")
gives us our attributes_card
.attributes_card.find_all("dl")
gives us all of our individual attributes.?pagenumber={page_number+1}
.If we're on page 1, our URL looks like this:https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten
https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten?pagenumber={page_number+1}
start_scrape()
.def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(search_info, location, page_number, retries=3): base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" url = "" if page_number != 0: url = f"{base_url}?pagenumber={page_number+1}" else: url = base_url tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = { "name": name, "price": price, "size": size, "date_available": date_available, "url": link } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] ## Job Processes for keyword in keyword_list: start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
start_scrape()
now gives us the ability to call scrape_search_results()
on multiple pages.DataPipeline
and a dataclass
to represent our search results.Here is our dataclass
, we call it SearchData
.@dataclassclass SearchData: name: str = "" price: str = "" size: str = "" date_available: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. It opens a pipe to a CSV file. Then, we feed it SearchData
objects. The pipeline will filter out duplicate objects by their name
attribute. All non-duplicate items get saved to the CSV file.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: str = "" size: str = "" date_available: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" url = "" if page_number != 0: url = f"{base_url}?pagenumber={page_number+1}" else: url = base_url tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = SearchData( name=name, price=price, size=size, date_available=date_available, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
, we open a DataPipeline
and pass it into start_scrape()
.SearchData
object and pass that object into the DataPipeline
.start_scrape()
to crawl a list of pages using a for
loop. When we add concurrency to our crawler, it will be able to crawl multiple pages at the same time (concurrently).We accomplish this by replacing our for
loop with a call to ThreadPoolExecutor
.Here is our refactored version of start_scrape()
.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map()
, they replace our for
loop.scrape_search_results
: the function we want to call on each available thread.scrape_search_results
. We pass them in as arrays. Then, executor.map()
passes them into scrape_search_results
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: str = "" size: str = "" date_available: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" url = "" if page_number != 0: url = f"{base_url}?pagenumber={page_number+1}" else: url = base_url tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = SearchData( name=name, price=price, size=size, date_available=date_available, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
get_scrapeops_url()
. It takes in a url and location, then, it spits out a proxied URL.Take a look at the payload
in the function below. These are all of the parameters that get sent to the ScrapeOps server to tell it what we want to do.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "render_js": True, "bypass": "generic_level_3", "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
"api_key"
: your ScrapeOps API key."url"
: the url of the website you want to scrape."render_js"
: we want ScrapeOps to open a real browser and render JavaScript content on the page."bypass"
: the level of anti-bot system we wish to bypass."country"
: the country we want to appear in.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "render_js": True, "bypass": "generic_level_3", "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: str = "" size: str = "" date_available: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" url = "" if page_number != 0: url = f"{base_url}?pagenumber={page_number+1}" else: url = base_url tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = SearchData( name=name, price=price, size=size, date_available=date_available, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
:MAX_THREADS
: Controls the number of threads that the program will use for concurrent execution.MAX_RETRIES
: Defines the number of times the scraper will retry a failed request before giving up.PAGES
: Determines how many pages of Google search results to scrape for each keyword.LOCATION
: Specifies the geographical location (country) for the Google search.keyword_list
: This is a list of keywords for which the script will perform the search and subsequent scraping.main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip() price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\ .replace("Kalkuliert von ImmoScout24", "").strip() additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip() heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip() total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip() cost_data = { "name": row["name"], "cold_rent": cold_rent, "price_per_m2": price_per_m2, "additional_costs": additional_costs, "total_cost": total_cost } print(cost_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
dd
elements with slightly different classes. You can see them listed below.cold_rent
: is24qa-kaltmiete grid-item three-fifths
price_per_m2
: is24qa-preism² grid-item three-fifths
additional_costs
: is24qa-nebenkosten grid-item three-fifths
heating_costs
: is24qa-heizkosten grid-item three-fifths
total_cost
: is24qa-gesamtmiete grid-item three-fifths font-bold
process_results()
.dict
objects.process_listing()
on each row that we read into the array.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "render_js": True, "bypass": "generic_level_3", "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: str = "" size: str = "" date_available: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" url = "" if page_number != 0: url = f"{base_url}?pagenumber={page_number+1}" else: url = base_url tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = SearchData( name=name, price=price, size=size, date_available=date_available, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip() price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\ .replace("Kalkuliert von ImmoScout24", "").strip() additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip() heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip() total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip() cost_data = { "name": row["name"], "cold_rent": cold_rent, "price_per_m2": price_per_m2, "additional_costs": additional_costs, "total_cost": total_cost } print(cost_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
process_results()
reads our CSV file into an array of dict
objects.process_results()
also iterates through that array and calls process_listing()
on each row from the array.DataPipeline
class. All we need is another dataclass
to pass into it. Then, we'll be able to generate a cost report for each listing we scrape. This one will be called CostData
.@dataclassclass CostData: name: str = "" cold_rent: str = "" price_per_m2: str = "" additional_costs: str = "" total_cost: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
in our parsing function. Then we pass CostData
into it and close the pipeline once the parse is finished.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "render_js": True, "bypass": "generic_level_3", "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: str = "" size: str = "" date_available: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CostData: name: str = "" cold_rent: str = "" price_per_m2: str = "" additional_costs: str = "" total_cost: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" url = "" if page_number != 0: url = f"{base_url}?pagenumber={page_number+1}" else: url = base_url tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = SearchData( name=name, price=price, size=size, date_available=date_available, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv") cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip() price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\ .replace("Kalkuliert von ImmoScout24", "").strip() additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip() heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip() total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip() cost_data = CostData( name=row["name"], cold_rent=cold_rent, price_per_m2=price_per_m2, additional_costs=additional_costs, total_cost=total_cost ) costs_pipeline.add_data(cost_data) costs_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline
within our parsing function.CostData
and pass it into the pipeline.ThreadPoolExecutor
.In this next snippet, we once again replace a for
loop with a call to ThreadPoolExecutor
.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, reader, [location] * len(reader), [retries] * len(reader) )
process_listing
is the function we want to call on each available thread.executor.map()
are the args we wish to pass into process_listing
. We once again pass them in as arrays, and executor.map()
passes them into process_listing
.response
.response = requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "render_js": True, "bypass": "generic_level_3", "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: str = "" size: str = "" date_available: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CostData: name: str = "" cold_rent: str = "" price_per_m2: str = "" additional_costs: str = "" total_cost: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" url = "" if page_number != 0: url = f"{base_url}?pagenumber={page_number+1}" else: url = base_url tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = SearchData( name=name, price=price, size=size, date_available=date_available, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv") cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip() price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\ .replace("Kalkuliert von ImmoScout24", "").strip() additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip() heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip() total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip() cost_data = CostData( name=row["name"], cold_rent=cold_rent, price_per_m2=price_per_m2, additional_costs=additional_costs, total_cost=total_cost ) costs_pipeline.add_data(cost_data) costs_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_THREADS
: Controls the number of threads that the program will use for concurrent execution.MAX_RETRIES
: Defines the number of times the scraper will retry a failed request before giving up.PAGES
: Determines how many pages of Google search results to scrape for each keyword.LOCATION
: Specifies the geographical location (country) for the Google search.keyword_list
: This is a list of keywords for which the script will perform the search and subsequent scraping.main
. We're running a 3 page crawl and then scraping costs on all of the individual results.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}" crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
. Immobilienscout24 explicitly prohibits scraping. We did violate their policies in this scrape.Violating terms like these can result in suspension or even a permanent band from the site. You can view links to these policies below.bypass
feature first hand.Take this new knowledge and go build something cool! If you'd like to learn more about the tech used in this article, check out the links below.Then check out ScrapeOps, the complete toolkit for web scraping.
# Create a virtual environment (recommended)python -m venv venvsource venv/bin/activate # or `venv\Scripts\activate` on Windows # Install dependenciespip install selenium beautifulsoup4 python-dotenv
# Create .env file and add your ScrapeOps keyecho "SCRAPEOPS_API_KEY=your-key-here" > .env
# Key parameters you can modify:MAX_THREADS = 2 # Concurrent browser instancesMAX_RETRIES = 3 # Retry attempts per pagePAGES = 1 # Pages to scrape per locationLOCATION = "de" # Target country code # Define your search areaskeyword_list = [ {"state": "bayern", "city": "muenchen"}, # Add more locations as needed]
# ..............................create a custom logger................................ import logging logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # ..............................bypass bots................................ from dotenv import load_dotenvfrom urllib.parse import urlencodeimport os load_dotenv()API_KEY = os.getenv('SCRAPEOPS_API_KEY') def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "render_js": True, "bypass": "generic_level_3", "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # ..............................create data storage classes................................ from dataclasses import dataclass, fields, asdictimport csvimport time @dataclassclass SearchData: name: str = "" price: str = "" size: str = "" date_available: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CostData: name: str = "" cold_rent: str = "" price_per_m2: str = "" additional_costs: str = "" total_cost: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() # ..............................create page crawler................................ from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.common.exceptions import TimeoutException, WebDriverExceptionfrom bs4 import BeautifulSoup def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3, timeout=10): # Construct the URL based on search parameters base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" url = f"{base_url}?pagenumber={page_number+1}" if page_number != 0 else base_url scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') # Run in headless mode tries = 0 success = False while tries <= retries and not success: try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) response = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) time.sleep(2) soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = SearchData( name=name, price=price, size=size, date_available=date_available, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except (TimeoutException, WebDriverException) as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 # ..............................run crawler using concurrency................................ import concurrent.futures def crawl_pages(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) # ..............................scrape each page................................ def scrape_each_page(row, location, retries=3, timeout=10): url = row["url"] tries = 0 success = False scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') # Run in headless mode while tries <= retries and not success: try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) response = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) time.sleep(1) logger.info("......................resting for a sec before proceeding...........................") soup = BeautifulSoup(response.text, "html.parser") costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv") cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip() price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\ .replace("Kalkuliert von ImmoScout24", "").strip() additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip() total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip() cost_data = CostData( name=row["name"], cold_rent=cold_rent, price_per_m2=price_per_m2, additional_costs=additional_costs, total_cost=total_cost ) costs_pipeline.add_data(cost_data) costs_pipeline.close_pipeline() success = True except (TimeoutException, WebDriverException) as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") # ..............................run the scraper using concurrency................................ def scrape_pages(csv_file, location, max_threads=2, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_each_page, reader, [location] * len(reader), [retries] * len(reader) ) # ..............................example production run................................ if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 1 LOCATION = "de" ## INPUT ---> List of keywords to crawl the page for keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] logger.info(f"Crawl starting...") for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}.csv" aggregate_files.append(filename) crawl_pipeline = DataPipeline(csv_filename=filename) crawl_pages(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info(f"Scrape starting...") for file in aggregate_files: scrape_pages(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info(f"Scrape complete.")
SearchData
and CostData
, to define the structure for listing information and cost details. A DataPipeline
class manages data storage, checking for duplicates and periodically saving data to CSV files.scrape_search_results
and scrape_each_page
fetch listing data from main and detail pages, using Selenium to load pages and BeautifulSoup to parse the HTML content.ThreadPoolExecutor
, allowing us to scrape multiple pages concurrently. This is implemented in crawl_pages
and scrape_pages
.https://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mieten
bayern (Bavaria)
is our state, andmuenchen (Munich)
is the city we're searching.https://www.immobilienscout24.de/Suche/de/{state}/{city}/wohnung-mieten
div
elements with class result-list-entry__data
dd
elements with specific classeshttps://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mieten
https://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mieten?pagenumber=2
../wohnung-mieten
../wohnung-mieten?pagenumber=2
?pagenumber=2
. This is something unique for Immobilienscout24, passing ?pagenumber=1
actually trips their anti-bot system.country
param to be routed through the country of our choice. You can view all of our available country codes here.To make our scraper more resilient and maintain location authencity, we'll take the following actions:country=de
).# Create and navigate to project folder mkdir scrapeopscd scrapeops
# Create virtual environmentpython -m venv venv
# Choose based on your OS# For Unix/macOS:source venv/bin/activate# For Windows:venv\Scripts\activate
# Core scraping packagespip install selenium # Browser automationpip install beautifulsoup4 # HTML parsingpip install python-dotenv # Environment variable management
.env
file for your ScrapeOps API key:# Create .env fileecho "SCRAPEOPS_API_KEY=your-key-here" > .env
scrapeops/
├── venv/
├── .env # API keys and configuration
└── main.py # Main scraping logic
from selenium import webdriver # Set up headless Chromeoptions = webdriver.ChromeOptions()options.add_argument('--headless') # Try to create a browser instancetry: driver = webdriver.Chrome(options=options) print("Selenium setup successful!") driver.quit()except Exception as e: print(f"Setup error: {e}")
# ..............................create a custom logger................................ import logging logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # ..............................bypass bots................................ from dotenv import load_dotenvfrom urllib.parse import urlencodeimport os load_dotenv()API_KEY = os.getenv('SCRAPEOPS_API_KEY') def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "render_js": True, "bypass": "generic_level_3", "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
logger
with INFO level to track our program's execution.Then we created a get_scrapeops_url
function that takes a target url
and returns a modified proxy URL, using our API_KEY
(loaded from environment variables) to access ScrapeOps' proxy service with JavaScript rendering and level 3 bot bypass protection.The function also accepts a location
parameter (defaulting to "us") to specify which country the request should appear to come from.SearchData
dataclass to store listing details and a DataPipeline
class to filter and save data in a CSV format.from dataclasses import dataclass, fields, asdictimport csvimport time @dataclassclass SearchData: name: str = "" price: str = "" size: str = "" date_available: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
SearchData
dataclass to store item details (name, price, size, date, URL) with automatic string cleaning and default values for empty fields.Then we built a DataPipeline
class to manage data storage, which maintains a queue of items, prevents duplicates by tracking names, and automatically saves data to a CSV file when the queue reaches its limit (default 50 items).The pipeline includes methods to add new data (add_data
), check for duplicates (is_duplicate
), save to CSV (save_to_csv
), and properly close the pipeline (close_pipeline
), ensuring all remaining data is written before shutting down.scrape_search_results()
to fetch and parse each listing, storing results in our DataPipeline
class. This function retries on failures and fetches pages using Selenium.from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.common.exceptions import TimeoutException, WebDriverExceptionfrom bs4 import BeautifulSoup def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3, timeout=10): # Construct the URL based on search parameters base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" url = f"{base_url}?pagenumber={page_number+1}" if page_number != 0 else base_url scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') # Run in headless mode tries = 0 success = False while tries <= retries and not success: try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) logger.info(".................request reached scrapeops_proxy_url.................") response = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) logger.info(".................response succeeded.................") time.sleep(2) logger.info("...............Resting for 2 sec before proceeding...................") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = SearchData( name=name, price=price, size=size, date_available=date_available, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except (TimeoutException, WebDriverException) as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1
scrape_search_results
function to handle the complexity of scraping ImmobilienScout24 pages with bot protection. By using get_scrapeops_url
along with ScrapeOps, we created a proxy URL that bypasses anti-bot measures.Inside the function, we used Selenium’s WebDriverWait
to ensure pages loaded fully before parsing, with retries set in case of timeouts or failures.Our scraper extracts key listing data like name
, price
, size
, and date_available
, which are each mapped to specific HTML elements on the page.Each extracted listing is saved as a SearchData
instance and added to our data_pipeline
for CSV output.ThreadPoolExecutor
.import concurrent.futures def crawl_pages(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
main
function to specify our settings and run the scraper in production.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 1 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to crawl the page for keyword_list = [{"state": "bayern", "city": "muenchen"}] ## Job Processes for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}.csv" crawl_pipeline = DataPipeline(csv_filename=filename) crawl_pages(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
__main__
section, we set parameters like MAX_RETRIES
, MAX_THREADS
, and PAGES
to control our scraper's behavior.We specified locations to target by defining keyword_list
, with entries such as "state": "bayern"
and "city": "muenchen"
. For each location, we initialized a new DataPipeline
, named the resulting CSV files based on each location, and used crawl_pages
to perform a multi-page crawl.Finally, we closed the data pipeline to save each file, logging the start and end of the crawl to confirm its successful completion.Here is the full code for the crawler:# ..............................create a custom logger................................ import logging logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # ..............................step1: bypass bots................................ from dotenv import load_dotenvfrom urllib.parse import urlencodeimport os load_dotenv()API_KEY = os.getenv('SCRAPEOPS_API_KEY') def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "render_js": True, "bypass": "generic_level_3", "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # ..............................step2: create data storage classes................................ from dataclasses import dataclass, fields, asdictimport csvimport time @dataclassclass SearchData: name: str = "" price: str = "" size: str = "" date_available: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() # ..............................step3: create page crawler................................ from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.common.exceptions import TimeoutException, WebDriverExceptionfrom bs4 import BeautifulSoup def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3, timeout=10): # Construct the URL based on search parameters base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" url = f"{base_url}?pagenumber={page_number+1}" if page_number != 0 else base_url scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') # Run in headless mode tries = 0 success = False while tries <= retries and not success: try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) logger.info(".................request reached scrapeops_proxy_url.................") response = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) logger.info(".................response succeeded.................") time.sleep(2) logger.info("...............Resting for 2 sec before proceeding...................") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = SearchData( name=name, price=price, size=size, date_available=date_available, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except (TimeoutException, WebDriverException) as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 # ..............................step4: add concurrency run................................ import concurrent.futures def crawl_pages(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) # ..............................step5: example production run................................ if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 1 LOCATION = "de" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to crawl the page for keyword_list = [{"state": "bayern", "city": "muenchen"}] ## Job Processes for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}.csv" crawl_pipeline = DataPipeline(csv_filename=filename) crawl_pages(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
INFO:__main__:Crawl starting... DevTools listening on ws://127.0.0.1:63931/devtools/browser/b88abdd3-f295-427b-a4fc-7cf41ff5d11fINFO:__main__:.................request reached scrapeops_proxy_url.................INFO:__main__:.................response succeeded.................INFO:__main__:...............Resting for 2 sec before proceeding...................WARNING:__main__:Duplicate item found: Altbogenhausen, MĂĽnchen. Item dropped.WARNING:__main__:Duplicate item found: Amalie-Nacken-Str. 10, Freiham, MĂĽnchen. Item dropped.INFO:__main__:Successfully parsed data from: https://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mietenINFO:__main__:Crawl complete.
# ..............................create a custom logger................................ import logging logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # ..............................bypass bots................................ from dotenv import load_dotenvfrom urllib.parse import urlencodeimport os load_dotenv()API_KEY = os.getenv('SCRAPEOPS_API_KEY') def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "render_js": True, "bypass": "generic_level_3", "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
logger
with the INFO level, allowing us to monitor the program’s execution, log successful actions, and catch any errors or warnings without cluttering output with debug messages.To bypass bot protection, we created the get_scrapeops_url
function, which generates a proxy URL with ScrapeOps settings. This function takes a url
and a location
parameter (defaulting to "us"), which we then configure using environment variables.Using API_KEY
, loaded via dotenv
, the function builds a proxy URL with JavaScript rendering enabled and sets a high anti-bot bypass level (generic_level_3
).This setup helps ensure that we avoid bot detection during each scrape request.CostData
dataclass and a DataPipeline
class to handle storage and duplicate-checking.from dataclasses import dataclass, fields, asdictimport csvimport time @dataclassclass CostData: name: str = "" cold_rent: str = "" price_per_m2: str = "" additional_costs: str = "" total_cost: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
scrape_each_page()
to extract relevant cost details from each listing:from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.common.exceptions import TimeoutException, WebDriverExceptionfrom bs4 import BeautifulSoup def scrape_each_page(row, location, retries=3, timeout=10): url = row["url"] tries = 0 success = False scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') # Run in headless mode while tries <= retries and not success: try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) response = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) time.sleep(1) logger.info("......................resting for a sec before proceeding...........................") soup = BeautifulSoup(response.text, "html.parser") costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv") cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip() price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\ .replace("Kalkuliert von ImmoScout24", "").strip() additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip() total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip() cost_data = CostData( name=row["name"], cold_rent=cold_rent, price_per_m2=price_per_m2, additional_costs=additional_costs, total_cost=total_cost ) costs_pipeline.add_data(cost_data) costs_pipeline.close_pipeline() success = True except (TimeoutException, WebDriverException) as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
scrape_each_page
function to collect cost details for each property listing.First, we start by generating a proxy URL using the get_scrapeops_url
function. This URL allows our requests to bypass bot protection while loading each listing’s webpage from a specified location.Next, we load the webpage through Selenium’s WebDriver in headless mode, ensuring the page renders completely before proceeding. Using BeautifulSoup, we then parse the loaded HTML to extract details such as cold_rent
, price_per_m2
, additional_costs
, and total_cost
by targeting their specific tags and classes.Finally, we store the parsed data in a CSV file named COST-{property_name}.csv
using a DataPipeline
instance, ensuring each property’s cost data is saved in its file. A retry mechanism handles any errors during loading, retrying the request if necessary.ThreadPoolExecutor
to run multiple threads concurrently:import concurrent.futures def scrape_pages(csv_file, location, max_threads=2, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_each_page, reader, [location] * len(reader), [retries] * len(reader) )
scrape_pages
function starts by loading data from the specified csv_file
, which contains URLs and other information for each property. Each row in the file represents a property listing that needs to be scraped for cost data.We then use ThreadPoolExecutor
from Python’s concurrent.futures
to handle multiple listings in parallel.By setting max_workers
to max_threads
, we control the number of threads running concurrently, allowing each thread to call scrape_each_page
on a unique row of data from the CSV file. This setup includes passing additional arguments for location
and retries
, which are repeated for each listing.With this approach, scrape_pages
efficiently processes multiple listings at once, significantly speeding up the scraping process while managing concurrency.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 LOCATION = "de" keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}.csv" aggregate_files.append(filename) for file in aggregate_files: scrape_pages(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
# ..............................create a custom logger................................ import logging logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # ..............................step1: bypass bots................................ from dotenv import load_dotenvfrom urllib.parse import urlencodeimport os load_dotenv()API_KEY = os.getenv('SCRAPEOPS_API_KEY') def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "render_js": True, "bypass": "generic_level_3", "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # ..............................step2: create data storage classes................................ from dataclasses import dataclass, fields, asdictimport csvimport time @dataclassclass CostData: name: str = "" cold_rent: str = "" price_per_m2: str = "" additional_costs: str = "" total_cost: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() # ..............................step3: scrape individual pages................................ from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.common.exceptions import TimeoutException, WebDriverExceptionfrom bs4 import BeautifulSoup def scrape_each_page(row, location, retries=3, timeout=10): url = row["url"] tries = 0 success = False scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') # Run in headless mode while tries <= retries and not success: try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) response = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) time.sleep(1) logger.info("......................resting for a sec before proceeding...........................") soup = BeautifulSoup(response.text, "html.parser") costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv") cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip() price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\ .replace("Kalkuliert von ImmoScout24", "").strip() additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip() total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip() cost_data = CostData( name=row["name"], cold_rent=cold_rent, price_per_m2=price_per_m2, additional_costs=additional_costs, total_cost=total_cost ) costs_pipeline.add_data(cost_data) costs_pipeline.close_pipeline() success = True except (TimeoutException, WebDriverException) as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") # ..............................step4: add concurrency run................................ import concurrent.futures def scrape_pages(csv_file, location, max_threads=2, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_each_page, reader, [location] * len(reader), [retries] * len(reader) ) # ..............................step5: production run................................ if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 LOCATION = "de" keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}.csv" aggregate_files.append(filename) for file in aggregate_files: scrape_pages(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
INFO:__main__:processing bayern-muenchen.csv DevTools listening on ws://127.0.0.1:64158/devtools/browser/7627ad38-5715-4107-8bcf-9aaa722e5c87 DevTools listening on ws://127.0.0.1:64166/devtools/browser/29bb3f6b-d1c9-4317-afa3-ec1e3ca237f1INFO:__main__:......................resting for a sec before proceeding...........................INFO:__main__:Successfully parsed: https://www.immobilienscout24.de/expose/154971106 DevTools listening on ws://127.0.0.1:64196/devtools/browser/de4a9a94-a707-4e87-8051-ea49ec011227INFO:__main__:......................resting for a sec before proceeding........................... DevTools listening on ws://127.0.0.1:64224/devtools/browser/175648b7-6df4-4574-92a8-ce6f50a5de81INFO:__main__:......................resting for a sec before proceeding...........................INFO:__main__:Successfully parsed: https://www.immobilienscout24.de/expose/155095135 DevTools listening on ws://127.0.0.1:64252/devtools/browser/6d5de803-3fff-40d3-b56d-8fc22a801c00INFO:__main__:......................resting for a sec before proceeding...........................INFO:__main__:Successfully parsed: https://www.immobilienscout24.de/expose/155095076 DevTools listening on ws://127.0.0.1:64279/devtools/browser/3e9ede0b-3236-43ec-9d04-14fbf72321e8INFO:__main__:......................resting for a sec before proceeding...........................INFO:__main__:Successfully parsed: https://www.immobilienscout24.de/expose/155095113 DevTools listening on ws://127.0.0.1:64301/devtools/browser/04c7d1dc-fd0c-4d24-a39d-8a846a974b0bINFO:__main__:......................resting for a sec before proceeding...........................INFO:__main__:Successfully parsed: https://www.immobilienscout24.de/expose/155095107 DevTools listening on ws://127.0.0.1:64320/devtools/browser/abb7249c-c1ca-4d1f-8e58-98cad50b8222INFO:__main__:......................resting for a sec before proceeding...........................INFO:__main__:Successfully parsed: https://www.immobilienscout24.de/expose/155095123 DevTools listening on ws://127.0.0.1:64345/devtools/browser/357ce9ae-c438-4046-af09-147e1a1f21a5INFO:__main__:......................resting for a sec before proceeding........................... DevTools listening on ws://127.0.0.1:64368/devtools/browser/28d4ec9d-7e45-4de3-8f53-b74b1e29f461INFO:__main__:......................resting for a sec before proceeding...........................INFO:__main__:Successfully parsed: https://www.immobilienscout24.de/expose/151571109INFO:__main__:......................resting for a sec before proceeding...........................
# ..............................create a custom logger................................ import logging logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # ..............................step1: bypass bots................................ from dotenv import load_dotenvfrom urllib.parse import urlencodeimport os load_dotenv()API_KEY = os.getenv('SCRAPEOPS_API_KEY') def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "render_js": True, "bypass": "generic_level_3", "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # ..............................step2: create data storage classes................................ from dataclasses import dataclass, fields, asdictimport csvimport time @dataclassclass SearchData: name: str = "" price: str = "" size: str = "" date_available: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CostData: name: str = "" cold_rent: str = "" price_per_m2: str = "" additional_costs: str = "" total_cost: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() # ..............................step3: create page crawler................................ from selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.common.exceptions import TimeoutException, WebDriverExceptionfrom bs4 import BeautifulSoup def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3, timeout=10): # Construct the URL based on search parameters base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten" url = f"{base_url}?pagenumber={page_number+1}" if page_number != 0 else base_url scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') # Run in headless mode tries = 0 success = False while tries <= retries and not success: try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) logger.info(".................request reached scrapeops_proxy_url.................") response = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) logger.info(".................response succeeded.................") time.sleep(2) logger.info("...............Resting for 2 sec before proceeding...................") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="result-list-entry__data") if not div_cards: raise Exception("Listings failed to load!") for card in div_cards: name = card.find("div", class_="result-list-entry__address font-ellipsis").text href = card.find("a").get("href") link = "" prefix = "https://www.immobilienscout24.de" if prefix in href: continue else: link = f"{prefix}{href}" attributes_card = card.select_one("div[data-is24-qa='attributes']") attributes = attributes_card.find_all("dl") price = attributes[0].text.replace("Kaltmiete", "") size = attributes[1].text.replace("Wohnfläche", "") date_available = "n/a" date_text = attributes[2].find("dd").text if "Zi" not in date_text: date_available = date_text search_data = SearchData( name=name, price=price, size=size, date_available=date_available, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except (TimeoutException, WebDriverException) as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 # ..............................step4: run crawler using concurrency................................ import concurrent.futures def crawl_pages(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) # ..............................step5: scrape individual pages................................ def scrape_each_page(row, location, retries=3, timeout=10): url = row["url"] tries = 0 success = False scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') # Run in headless mode while tries <= retries and not success: try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) response = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) time.sleep(1) logger.info("......................resting for a sec before proceeding...........................") soup = BeautifulSoup(response.text, "html.parser") costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv") cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip() price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\ .replace("Kalkuliert von ImmoScout24", "").strip() additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip() total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip() cost_data = CostData( name=row["name"], cold_rent=cold_rent, price_per_m2=price_per_m2, additional_costs=additional_costs, total_cost=total_cost ) costs_pipeline.add_data(cost_data) costs_pipeline.close_pipeline() success = True except (TimeoutException, WebDriverException) as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") # ..............................step6: run the scraper using concurrency................................ def scrape_pages(csv_file, location, max_threads=2, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_each_page, reader, [location] * len(reader), [retries] * len(reader) ) # ..............................example production run................................ if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 2 PAGES = 1 LOCATION = "de" ## INPUT ---> List of keywords to crawl the page for keyword_list = [{"state": "bayern", "city": "muenchen"}] aggregate_files = [] logger.info(f"Crawl starting...") for keyword in keyword_list: filename = f"{keyword['state']}-{keyword['city']}.csv" aggregate_files.append(filename) crawl_pipeline = DataPipeline(csv_filename=filename) crawl_pages(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info(f"Scrape starting...") for file in aggregate_files: scrape_pages(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info(f"Scrape complete.")
robots.txt
file. Violating such terms can lead to consequences, including account suspension or permanent bans from the site.