Skip to main content

Scrape Immobilienscout24 With Python Requests and BeautifulSoup

How to Scrape Immobilienscout24 With Requests and BeautifulSoup

Immobilienscout24 is a great place to look up rentals and real estate listings for sale in Germany.

Today, we're going to scrape rentals from Immobilienscout24. They use some of the most difficult anti-bot software we've run into during this series.


TLDR - How to Scrape Immobilienscout24

If you need to scrape Immobilienscout24, look no further! You can use our pre-built scraper below.

  1. Simply create a a new folder with a config.json file.
  2. Inside the config file, add your ScrapeOps API key: {"api_key": "your-super-secret-api-key"}.
  3. Then, copy/paste the code below into a Python file.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"render_js": True,
"bypass": "generic_level_3",
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class CostData:
name: str = ""
cold_rent: str = ""
price_per_m2: str = ""
additional_costs: str = ""
total_cost: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")

for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")

price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text

search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv")

cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip()
price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\
.replace("Kalkuliert von ImmoScout24", "").strip()
additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip()
heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip()
total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip()

cost_data = CostData(
name=row["name"],
cold_rent=cold_rent,
price_per_m2=price_per_m2,
additional_costs=additional_costs,
total_cost=total_cost
)
costs_pipeline.add_data(cost_data)
costs_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "de"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

To change your results, go ahead and change any of the following:

  • MAX_THREADS: Controls the number of threads that the program will use for concurrent execution.
  • MAX_RETRIES: Defines the number of times the scraper will retry a failed request before giving up.
  • PAGES: Determines how many pages of Google search results to scrape for each keyword.
  • LOCATION: Specifies the geographical location (country) for the Google search.
  • keyword_list: This is a list of keywords for which the script will perform the search and subsequent scraping.

After you run the code, you will have a single report named after your location. After this file pops up in your folder, you will get a number of CSVs each named COST-{name-of-property}.


How To Architect Our Immobilienscout24 Scraper

We need to build two scrapers for this project. We need a search crawler and then we need a listing scraper.

  1. The search crawler needs to perform a search and then save all of our results to a CSV file.
  2. The listing scraper will then read the CSV file, and scrape cost data for each of the listings we crawled.

Here is the build process for our search crawler:

  1. Parsing a search page.
  2. Controlling our search results with pagination.
  3. Storing our data.
  4. Concurrently running steps 1 through 3.
  5. Proxy Integration to bypass anti-bots.

We'll build the listing scraper using these steps:

  1. Parsing a listing page.
  2. Reading our CSV file.
  3. Storing our parsed listing data.
  4. Running steps steps 1 and 3 simultaneously with concurrency.
  5. Bypassing anti-bots with proxy integration.

Understanding How To Scrape Immobilienscout24

We need to get a better understanding of Immobilienscout24 and how it's laid out. We need to understand how to perform searches and where the data is located on each page.

In the next few sections, we're going to look at:

  • How To Request Pages
  • Extracting Data From These Pages
  • How To Control Pagination
  • How To Control Geolocation

Step 1: How To Request Immobilienscout24 Pages

Whenever you request a webpage, it begins with a GET.

  • When you do it with your browser, under the hood, the browser makes a GET.
  • We can do this manually with Python Requests.
  • In both cases, we receive a response back in the form of an HTML page.
  • Our browser receives this response and renders the page for us to see. Our scraper needs to dig through the HTML and find our data.

When we perform a search, our URL looks like this:

https://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mieten
  • bayern (Bavaria) is our state, and muenchen (Munich) is the city we're searching.

So, when we format our URLs, they look like this:

https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten

Search Results Page

If you click on a listing from this search, you'll get a page that looks like this. As you scroll down, you'll see a cost table as shown in the second screenshot.

Listing Detail Page

Listing Detail Page


Step 2: How To Extract Data From Immobilienscout24 Results and Pages

Now that we know what these pages look like, we need to take a look at how to extract their data. First, we'll take a look at the search results page. Then we'll look at the individual listing page.

In the search results, each listing is embedded within a div with the class result-list-entry__data. If we find all of these div items, we can go through and pull all of the data from these items.

Search Results Page HTML Inspection

On our listings page, we want to find the costs section. All of our costs are embedded within dd elements on the page.

Listing Page HTML Inspection


Step 3: How To Control Pagination

Pagination is very easy to control. You saw our search URL earlier:

https://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mieten

Take a look at the URL for page 2:

https://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mieten?pagenumber=2

The only major difference is ?pagenumber=2. This is something unique for Immobilienscout24, passing ?pagenumber=1 actually trips their anti-bot system.

Take a look at the shot below.

Blocked Page

When we're looking at page 1 of our search, we need to omit the pagenumber parameter. For all other pages, we need to include it.


Step 4: Geolocated Data

It's super easy to handle our geolocation using the ScrapeOps API. When using the ScrapeOps Proxy Aggregator, we can pass a country param to be routed through the country of our choice.

In this case, we want to appear in Germany, so we'll pass "country": "de".

You can view all of our available country codes here.


Setting Up Our Immobilienscout24 Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir immobilienscout24-scraper

cd immobilienscout24-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A Immobilienscout24 Search Crawler

Now, it's time to start actually building! In the next few sections, we're going to build our crawler. We'll go about this in the following steps:

  1. Building a Parser
  2. Adding Pagination
  3. Storing Our Data
  4. Adding Concurrency
  5. Proxy Integration

Step 1: Create Simple Search Data Parser

We'll start by writing a script with our basic structure and a parsing function. In the code below, we create our basic structure: error handling, a parsing function with retry logic, and a main block.

All of this is important, but if you're here to learn scraping, the parsing function is by far the most important.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(search_info, location, retries=3):
url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")

for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")

price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text

search_data = {
"name": name,
"price": price,
"size": size,
"date_available": date_available,
"url": link
}
print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []

## Job Processes
for keyword in keyword_list:

scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")
  • card.find("div", class_="result-list-entry__address font-ellipsis").text gives us the address or name of each property.
  • We find the href with card.find("a").get("href").
  • We check if our href includes our prefix: "https://www.immobilienscout24.de". If it does, this is an add so we skip it with continue.
  • card.select_one("div[data-is24-qa='attributes']") gives us our attributes_card.
  • attributes_card.find_all("dl") gives us all of our individual attributes.
  • We then filter out certain strings from our attributes to make them more readable.

Step 2: Add Pagination

As we discussed earlier in this article, adding pagination is a pretty simple process. As long as we're looking for page 2 or greater, we add the following to our URL: ?pagenumber={page_number+1}.

If we're on page 1, our URL looks like this:

https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten

All other pages will have a URL like this:

https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten?pagenumber={page_number+1}

We also need a function to scrape multiple pages. We'll call this one start_scrape().

def start_scrape(keyword, pages, location, retries=3):

for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)

Here is how it all fits together now.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(search_info, location, page_number, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")

for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")

price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text

search_data = {
"name": name,
"price": price,
"size": size,
"date_available": date_available,
"url": link
}
print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, retries=3):

for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []

## Job Processes
for keyword in keyword_list:

start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")
  • Our parsing function now has logic to account for pagination.
  • start_scrape() now gives us the ability to call scrape_search_results() on multiple pages.

Step 3: Storing the Scraped Data

It's extremely important that we store our data. We need to store it so humans can review it later. We also need to store it so our listing scraper can read the file and know which listings to scrape.

To accomplish this, we need a DataPipeline and a dataclass to represent our search results.

Here is our dataclass, we call it SearchData.

@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is our DataPipeline. It opens a pipe to a CSV file. Then, we feed it SearchData objects. The pipeline will filter out duplicate objects by their name attribute. All non-duplicate items get saved to the CSV file.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

Here is our full code up to this point.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")

for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")

price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text

search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):

for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • Inside our main, we open a DataPipeline and pass it into start_scrape().
  • As we parse each result, we use it to create a SearchData object and pass that object into the DataPipeline.

Step 4: Adding Concurrency

Adding concurrency to our crawler is a pretty simple process. We built start_scrape() to crawl a list of pages using a for loop. When we add concurrency to our crawler, it will be able to crawl multiple pages at the same time (concurrently).

We accomplish this by replacing our for loop with a call to ThreadPoolExecutor.

Here is our refactored version of start_scrape().

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

Pay attention to the args in executor.map(), they replace our for loop.

  • scrape_search_results: the function we want to call on each available thread.
  • All other args are the args we pass into scrape_search_results. We pass them in as arrays. Then, executor.map() passes them into scrape_search_results.

Here is our full code, it's almost production ready.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")

for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")

price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text

search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 5: Bypassing Anti-Bots

The function below is called get_scrapeops_url(). It takes in a url and location, then, it spits out a proxied URL.

Take a look at the payload in the function below. These are all of the parameters that get sent to the ScrapeOps server to tell it what we want to do.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"render_js": True,
"bypass": "generic_level_3",
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
  • "api_key": your ScrapeOps API key.
  • "url": the url of the website you want to scrape.
  • "render_js": we want ScrapeOps to open a real browser and render JavaScript content on the page.
  • "bypass": the level of anti-bot system we wish to bypass.
  • "country": the country we want to appear in.

Our production ready crawler is available to use below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"render_js": True,
"bypass": "generic_level_3",
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")

for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")

price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text

search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Now that we have a functional crawler, we need to use it! Let's test it out in production. We're going to crawl 3 pages.

Feel free to change any of the following settings in the main:

  • MAX_THREADS: Controls the number of threads that the program will use for concurrent execution.
  • MAX_RETRIES: Defines the number of times the scraper will retry a failed request before giving up.
  • PAGES: Determines how many pages of Google search results to scrape for each keyword.
  • LOCATION: Specifies the geographical location (country) for the Google search.
  • keyword_list: This is a list of keywords for which the script will perform the search and subsequent scraping.

Here is our main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "de"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Here are the results from the crawl. It took 60.418 seconds to crawl 3 pages. This comes out to an average speed of 20.139 seconds per page. This takes so long because of the anti-bot bypass.

When Immobilienscout24 thinks it spots a bot, it sends a challenge to the browser. ScrapeOps solves the challenge for us and gives us access to the site.

Crawler Results Terminal


Build An Immobilienscout24 Scraper

Now that we're effectively crawling listings, we get a CSV file out of it. Next, we need to write a scraper that reads this file. After reading the file, our scrape should go through and scrape individual cost information from each listing we saved during the crawl.


Step 1: Create Simple Listing Data Parser

Like before, we'll start with parsing. The snippet below contains a parsing function that will essentially be the backbone of our listing scraper.

def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip()
price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\
.replace("Kalkuliert von ImmoScout24", "").strip()
additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip()
heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip()
total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip()

cost_data = {
"name": row["name"],
"cold_rent": cold_rent,
"price_per_m2": price_per_m2,
"additional_costs": additional_costs,
"total_cost": total_cost
}

print(cost_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")

All of our cost elements are dd elements with slightly different classes. You can see them listed below.

  • cold_rent: is24qa-kaltmiete grid-item three-fifths
  • price_per_m2: is24qa-preism² grid-item three-fifths
  • additional_costs: is24qa-nebenkosten grid-item three-fifths
  • heating_costs: is24qa-heizkosten grid-item three-fifths
  • total_cost: is24qa-gesamtmiete grid-item three-fifths font-bold

Step 2: Loading URLs To Scrape

In order to use the parsing function we just wrote, it needs urls. To give it a list of urls, we'll read the CSV that got generated by the crawler.

Take a look at the function below, process_results().

  • It reads our CSV file into an array of dict objects.
  • Then, it iterates through and calls process_listing() on each row that we read into the array.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

When we put it all together, we get a Python program that first performs a crawl, and then executes the listing scraper logic that we're writing right now.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"render_js": True,
"bypass": "generic_level_3",
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")

for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")

price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text

search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip()
price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\
.replace("Kalkuliert von ImmoScout24", "").strip()
additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip()
heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip()
total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip()

cost_data = {
"name": row["name"],
"cold_rent": cold_rent,
"price_per_m2": price_per_m2,
"additional_costs": additional_costs,
"total_cost": total_cost
}

print(cost_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
  • process_results() reads our CSV file into an array of dict objects.
  • process_results() also iterates through that array and calls process_listing() on each row from the array.

Step 3: Storing the Scraped Data

We already have a DataPipeline class. All we need is another dataclass to pass into it. Then, we'll be able to generate a cost report for each listing we scrape. This one will be called CostData.

@dataclass
class CostData:
name: str = ""
cold_rent: str = ""
price_per_m2: str = ""
additional_costs: str = ""
total_cost: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In the code below, we open a new DataPipeline in our parsing function. Then we pass CostData into it and close the pipeline once the parse is finished.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"render_js": True,
"bypass": "generic_level_3",
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class CostData:
name: str = ""
cold_rent: str = ""
price_per_m2: str = ""
additional_costs: str = ""
total_cost: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")

for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")

price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text

search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv")

cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip()
price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\
.replace("Kalkuliert von ImmoScout24", "").strip()
additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip()
heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip()
total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip()

cost_data = CostData(
name=row["name"],
cold_rent=cold_rent,
price_per_m2=price_per_m2,
additional_costs=additional_costs,
total_cost=total_cost
)
costs_pipeline.add_data(cost_data)
costs_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
  • We open a DataPipeline within our parsing function.
  • We parse our data into CostData and pass it into the pipeline.

Step 4: Adding Concurrency

We'll add concurrency the same way we did before, with ThreadPoolExecutor.

In this next snippet, we once again replace a for loop with a call to ThreadPoolExecutor.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)
  • process_listing is the function we want to call on each available thread.
  • All other args to executor.map() are the args we wish to pass into process_listing. We once again pass them in as arrays, and executor.map() passes them into process_listing.

Step 5: Bypassing Anti-Bots

We've already got a pretty good function for bypassing Immobilienscout24's anti-bots. We just need to use it within our parsing function. To do this, we'll slightly change our response.

response = requests.get(get_scrapeops_url(url, location=location))

Here is our finished product.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"render_js": True,
"bypass": "generic_level_3",
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class CostData:
name: str = ""
cold_rent: str = ""
price_per_m2: str = ""
additional_costs: str = ""
total_cost: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")

for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")

price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text

search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv")

cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip()
price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\
.replace("Kalkuliert von ImmoScout24", "").strip()
additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip()
heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip()
total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip()

cost_data = CostData(
name=row["name"],
cold_rent=cold_rent,
price_per_m2=price_per_m2,
additional_costs=additional_costs,
total_cost=total_cost
)
costs_pipeline.add_data(cost_data)
costs_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Time to run everything in production. Once again, feel free to change any of the following.

  • MAX_THREADS: Controls the number of threads that the program will use for concurrent execution.
  • MAX_RETRIES: Defines the number of times the scraper will retry a failed request before giving up.
  • PAGES: Determines how many pages of Google search results to scrape for each keyword.
  • LOCATION: Specifies the geographical location (country) for the Google search.
  • keyword_list: This is a list of keywords for which the script will perform the search and subsequent scraping.

If you need a refresher, here is our main. We're running a 3 page crawl and then scraping costs on all of the individual results.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "de"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Here are our results. Our crawl generated a CSV file with 50 results. If you remember earlier, the 3 page crawl took 60.418 seconds. The full process took 318.882 seconds. 318.882 - 60.418 = 258.464 seconds. 258.464 / 50 listings = 5.169 seconds per listing.

Scraper Performance Terminal


When we scrape the web, we need to be cautious about a few things legally and ethically. Scraping public data on the web is generally considered legal. If it's available publicly, it's fair game. The data we scraped today was public.

Private data (data gate behind a login) is a completely different story. If you choose to scrape private data, you should consult an attorney to determine the legality of your scrape.

We also need to pay attention to both their Terms and Conditions and their robots.txt. Immobilienscout24 explicitly prohibits scraping. We did violate their policies in this scrape.

Violating terms like these can result in suspension or even a permanent band from the site. You can view links to these policies below.


Conclusion

You now know how to scrape Immobilienscout24. You should have a solid understanding of parsing, pagination, data storage, concurrency, and proxy integration. You've also seen the power of ScrapeOps' bypass feature first hand.

Take this new knowledge and go build something cool! If you'd like to learn more about the tech used in this article, check out the links below.


More Python Web Scraping Guides

At ScrapeOps, we have tons of resources for everyone to learn from. Whether you're brand new to coding, or you're a seasoned web developer, we have something for you.

Take a look at our Python Web Scraping Playbook. If you want more from our "How To Scrape" series, check out the articles below.