How to Scrape Immobilienscout24 With Requests and BeautifulSoup
Immobilienscout24 is a great place to look up rentals and real estate listings for sale in Germany.
Today, we're going to scrape rentals from Immobilienscout24. They use some of the most difficult anti-bot software we've run into during this series.
- TLDR: How to Scrape Immobilienscout24
- How To Architect Our Scraper
- Understanding How To Scrape Immobilienscout24
- Setting Up Our Immobilienscout24 Scraper
- Build A Immobilienscout24 Search Crawler
- Build A Immobilienscout24 Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Immobilienscout24
If you need to scrape Immobilienscout24, look no further! You can use our pre-built scraper below.
- Simply create a a new folder with a
config.json
file. - Inside the config file, add your ScrapeOps API key:
{"api_key": "your-super-secret-api-key"}
. - Then, copy/paste the code below into a Python file.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"render_js": True,
"bypass": "generic_level_3",
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class CostData:
name: str = ""
cold_rent: str = ""
price_per_m2: str = ""
additional_costs: str = ""
total_cost: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")
for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")
price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text
search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv")
cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip()
price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\
.replace("Kalkuliert von ImmoScout24", "").strip()
additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip()
heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip()
total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip()
cost_data = CostData(
name=row["name"],
cold_rent=cold_rent,
price_per_m2=price_per_m2,
additional_costs=additional_costs,
total_cost=total_cost
)
costs_pipeline.add_data(cost_data)
costs_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "de"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
To change your results, go ahead and change any of the following:
MAX_THREADS
: Controls the number of threads that the program will use for concurrent execution.MAX_RETRIES
: Defines the number of times the scraper will retry a failed request before giving up.PAGES
: Determines how many pages of Google search results to scrape for each keyword.LOCATION
: Specifies the geographical location (country) for the Google search.keyword_list
: This is a list of keywords for which the script will perform the search and subsequent scraping.
After you run the code, you will have a single report named after your location. After this file pops up in your folder, you will get a number of CSVs each named COST-{name-of-property}
.
How To Architect Our Immobilienscout24 Scraper
We need to build two scrapers for this project. We need a search crawler and then we need a listing scraper.
- The search crawler needs to perform a search and then save all of our results to a CSV file.
- The listing scraper will then read the CSV file, and scrape cost data for each of the listings we crawled.
Here is the build process for our search crawler:
- Parsing a search page.
- Controlling our search results with pagination.
- Storing our data.
- Concurrently running steps 1 through 3.
- Proxy Integration to bypass anti-bots.
We'll build the listing scraper using these steps:
- Parsing a listing page.
- Reading our CSV file.
- Storing our parsed listing data.
- Running steps steps 1 and 3 simultaneously with concurrency.
- Bypassing anti-bots with proxy integration.
Understanding How To Scrape Immobilienscout24
We need to get a better understanding of Immobilienscout24 and how it's laid out. We need to understand how to perform searches and where the data is located on each page.
In the next few sections, we're going to look at:
- How To Request Pages
- Extracting Data From These Pages
- How To Control Pagination
- How To Control Geolocation
Step 1: How To Request Immobilienscout24 Pages
Whenever you request a webpage, it begins with a GET.
- When you do it with your browser, under the hood, the browser makes a GET.
- We can do this manually with Python Requests.
- In both cases, we receive a response back in the form of an HTML page.
- Our browser receives this response and renders the page for us to see. Our scraper needs to dig through the HTML and find our data.
When we perform a search, our URL looks like this:
https://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mieten
bayern
(Bavaria) is our state, andmuenchen
(Munich) is the city we're searching.
So, when we format our URLs, they look like this:
https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten
If you click on a listing from this search, you'll get a page that looks like this. As you scroll down, you'll see a cost table as shown in the second screenshot.
Step 2: How To Extract Data From Immobilienscout24 Results and Pages
Now that we know what these pages look like, we need to take a look at how to extract their data. First, we'll take a look at the search results page. Then we'll look at the individual listing page.
In the search results, each listing is embedded within a div
with the class result-list-entry__data
. If we find all of these div
items, we can go through and pull all of the data from these items.
On our listings page, we want to find the costs section. All of our costs are embedded within dd
elements on the page.
Step 3: How To Control Pagination
Pagination is very easy to control. You saw our search URL earlier:
https://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mieten
Take a look at the URL for page 2:
https://www.immobilienscout24.de/Suche/de/bayern/muenchen/wohnung-mieten?pagenumber=2
The only major difference is ?pagenumber=2
. This is something unique for Immobilienscout24, passing ?pagenumber=1
actually trips their anti-bot system.
Take a look at the shot below.
When we're looking at page 1 of our search, we need to omit the pagenumber
parameter. For all other pages, we need to include it.
Step 4: Geolocated Data
It's super easy to handle our geolocation using the ScrapeOps API. When using the ScrapeOps Proxy Aggregator, we can pass a country
param to be routed through the country of our choice.
In this case, we want to appear in Germany, so we'll pass "country": "de"
.
You can view all of our available country codes here.
Setting Up Our Immobilienscout24 Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir immobilienscout24-scraper
cd immobilienscout24-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A Immobilienscout24 Search Crawler
Now, it's time to start actually building! In the next few sections, we're going to build our crawler. We'll go about this in the following steps:
- Building a Parser
- Adding Pagination
- Storing Our Data
- Adding Concurrency
- Proxy Integration
Step 1: Create Simple Search Data Parser
We'll start by writing a script with our basic structure and a parsing function. In the code below, we create our basic structure: error handling, a parsing function with retry logic, and a main
block.
All of this is important, but if you're here to learn scraping, the parsing function is by far the most important.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(search_info, location, retries=3):
url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")
for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")
price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text
search_data = {
"name": name,
"price": price,
"size": size,
"date_available": date_available,
"url": link
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
card.find("div", class_="result-list-entry__address font-ellipsis").text
gives us the address orname
of each property.- We find the
href
withcard.find("a").get("href")
. - We check if our
href
includes ourprefix
:"https://www.immobilienscout24.de"
. If it does, this is an add so we skip it withcontinue
. card.select_one("div[data-is24-qa='attributes']")
gives us ourattributes_card
.attributes_card.find_all("dl")
gives us all of our individual attributes.- We then filter out certain strings from our attributes to make them more readable.
Step 2: Add Pagination
As we discussed earlier in this article, adding pagination is a pretty simple process. As long as we're looking for page 2 or greater, we add the following to our URL: ?pagenumber={page_number+1}
.
If we're on page 1, our URL looks like this:
https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten
All other pages will have a URL like this:
https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten?pagenumber={page_number+1}
We also need a function to scrape multiple pages. We'll call this one start_scrape()
.
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
Here is how it all fits together now.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(search_info, location, page_number, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")
for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")
price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text
search_data = {
"name": name,
"price": price,
"size": size,
"date_available": date_available,
"url": link
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
- Our parsing function now has logic to account for pagination.
start_scrape()
now gives us the ability to callscrape_search_results()
on multiple pages.
Step 3: Storing the Scraped Data
It's extremely important that we store our data. We need to store it so humans can review it later. We also need to store it so our listing scraper can read the file and know which listings to scrape.
To accomplish this, we need a DataPipeline
and a dataclass
to represent our search results.
Here is our dataclass
, we call it SearchData
.
@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is our DataPipeline
. It opens a pipe to a CSV file. Then, we feed it SearchData
objects. The pipeline will filter out duplicate objects by their name
attribute. All non-duplicate items get saved to the CSV file.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
Here is our full code up to this point.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")
for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")
price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text
search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- Inside our
main
, we open aDataPipeline
and pass it intostart_scrape()
. - As we parse each result, we use it to create a
SearchData
object and pass that object into theDataPipeline
.
Step 4: Adding Concurrency
Adding concurrency to our crawler is a pretty simple process. We built start_scrape()
to crawl a list of pages using a for
loop. When we add concurrency to our crawler, it will be able to crawl multiple pages at the same time (concurrently).
We accomplish this by replacing our for
loop with a call to ThreadPoolExecutor
.
Here is our refactored version of start_scrape()
.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
Pay attention to the args in executor.map()
, they replace our for
loop.
scrape_search_results
: the function we want to call on each available thread.- All other args are the args we pass into
scrape_search_results
. We pass them in as arrays. Then,executor.map()
passes them intoscrape_search_results
.
Here is our full code, it's almost production ready.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")
for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")
price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text
search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 5: Bypassing Anti-Bots
The function below is called get_scrapeops_url()
. It takes in a url and location, then, it spits out a proxied URL.
Take a look at the payload
in the function below. These are all of the parameters that get sent to the ScrapeOps server to tell it what we want to do.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"render_js": True,
"bypass": "generic_level_3",
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
"api_key"
: your ScrapeOps API key."url"
: the url of the website you want to scrape."render_js"
: we want ScrapeOps to open a real browser and render JavaScript content on the page."bypass"
: the level of anti-bot system we wish to bypass."country"
: the country we want to appear in.
Our production ready crawler is available to use below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"render_js": True,
"bypass": "generic_level_3",
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")
for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")
price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text
search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Now that we have a functional crawler, we need to use it! Let's test it out in production. We're going to crawl 3 pages.
Feel free to change any of the following settings in the main
:
MAX_THREADS
: Controls the number of threads that the program will use for concurrent execution.MAX_RETRIES
: Defines the number of times the scraper will retry a failed request before giving up.PAGES
: Determines how many pages of Google search results to scrape for each keyword.LOCATION
: Specifies the geographical location (country) for the Google search.keyword_list
: This is a list of keywords for which the script will perform the search and subsequent scraping.
Here is our main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "de"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Here are the results from the crawl. It took 60.418 seconds to crawl 3 pages. This comes out to an average speed of 20.139 seconds per page. This takes so long because of the anti-bot bypass.
When Immobilienscout24 thinks it spots a bot, it sends a challenge to the browser. ScrapeOps solves the challenge for us and gives us access to the site.
Build An Immobilienscout24 Scraper
Now that we're effectively crawling listings, we get a CSV file out of it. Next, we need to write a scraper that reads this file. After reading the file, our scrape should go through and scrape individual cost information from each listing we saved during the crawl.
Step 1: Create Simple Listing Data Parser
Like before, we'll start with parsing. The snippet below contains a parsing function that will essentially be the backbone of our listing scraper.
def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip()
price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\
.replace("Kalkuliert von ImmoScout24", "").strip()
additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip()
heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip()
total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip()
cost_data = {
"name": row["name"],
"cold_rent": cold_rent,
"price_per_m2": price_per_m2,
"additional_costs": additional_costs,
"total_cost": total_cost
}
print(cost_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
All of our cost elements are dd
elements with slightly different classes. You can see them listed below.
cold_rent
:is24qa-kaltmiete grid-item three-fifths
price_per_m2
:is24qa-preism² grid-item three-fifths
additional_costs
:is24qa-nebenkosten grid-item three-fifths
heating_costs
:is24qa-heizkosten grid-item three-fifths
total_cost
:is24qa-gesamtmiete grid-item three-fifths font-bold
Step 2: Loading URLs To Scrape
In order to use the parsing function we just wrote, it needs urls. To give it a list of urls, we'll read the CSV that got generated by the crawler.
Take a look at the function below, process_results()
.
- It reads our CSV file into an array of
dict
objects. - Then, it iterates through and calls
process_listing()
on each row that we read into the array.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_listing(row, location, retries=retries)
When we put it all together, we get a Python program that first performs a crawl, and then executes the listing scraper logic that we're writing right now.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"render_js": True,
"bypass": "generic_level_3",
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")
for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")
price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text
search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip()
price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\
.replace("Kalkuliert von ImmoScout24", "").strip()
additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip()
heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip()
total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip()
cost_data = {
"name": row["name"],
"cold_rent": cold_rent,
"price_per_m2": price_per_m2,
"additional_costs": additional_costs,
"total_cost": total_cost
}
print(cost_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_listing(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
process_results()
reads our CSV file into an array ofdict
objects.process_results()
also iterates through that array and callsprocess_listing()
on each row from the array.
Step 3: Storing the Scraped Data
We already have a DataPipeline
class. All we need is another dataclass
to pass into it. Then, we'll be able to generate a cost report for each listing we scrape. This one will be called CostData
.
@dataclass
class CostData:
name: str = ""
cold_rent: str = ""
price_per_m2: str = ""
additional_costs: str = ""
total_cost: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In the code below, we open a new DataPipeline
in our parsing function. Then we pass CostData
into it and close the pipeline once the parse is finished.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"render_js": True,
"bypass": "generic_level_3",
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class CostData:
name: str = ""
cold_rent: str = ""
price_per_m2: str = ""
additional_costs: str = ""
total_cost: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")
for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")
price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text
search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv")
cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip()
price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\
.replace("Kalkuliert von ImmoScout24", "").strip()
additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip()
heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip()
total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip()
cost_data = CostData(
name=row["name"],
cold_rent=cold_rent,
price_per_m2=price_per_m2,
additional_costs=additional_costs,
total_cost=total_cost
)
costs_pipeline.add_data(cost_data)
costs_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_listing(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
- We open a
DataPipeline
within our parsing function. - We parse our data into
CostData
and pass it into the pipeline.
Step 4: Adding Concurrency
We'll add concurrency the same way we did before, with ThreadPoolExecutor
.
In this next snippet, we once again replace a for
loop with a call to ThreadPoolExecutor
.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)
process_listing
is the function we want to call on each available thread.- All other args to
executor.map()
are the args we wish to pass intoprocess_listing
. We once again pass them in as arrays, andexecutor.map()
passes them intoprocess_listing
.
Step 5: Bypassing Anti-Bots
We've already got a pretty good function for bypassing Immobilienscout24's anti-bots. We just need to use it within our parsing function. To do this, we'll slightly change our response
.
response = requests.get(get_scrapeops_url(url, location=location))
Here is our finished product.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"render_js": True,
"bypass": "generic_level_3",
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: str = ""
size: str = ""
date_available: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class CostData:
name: str = ""
cold_rent: str = ""
price_per_m2: str = ""
additional_costs: str = ""
total_cost: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
base_url = f"https://www.immobilienscout24.de/Suche/de/{search_info['state']}/{search_info['city']}/wohnung-mieten"
url = ""
if page_number != 0:
url = f"{base_url}?pagenumber={page_number+1}"
else:
url = base_url
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="result-list-entry__data")
if not div_cards:
raise Exception("Listings failed to load!")
for card in div_cards:
name = card.find("div", class_="result-list-entry__address font-ellipsis").text
href = card.find("a").get("href")
link = ""
prefix = "https://www.immobilienscout24.de"
if prefix in href:
continue
else:
link = f"{prefix}{href}"
attributes_card = card.select_one("div[data-is24-qa='attributes']")
attributes = attributes_card.find_all("dl")
price = attributes[0].text.replace("Kaltmiete", "")
size = attributes[1].text.replace("Wohnfläche", "")
date_available = "n/a"
date_text = attributes[2].find("dd").text
if "Zi" not in date_text:
date_available = date_text
search_data = SearchData(
name=name,
price=price,
size=size,
date_available=date_available,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
costs_pipeline = DataPipeline(csv_filename=f"COST-{row['name']}.csv")
cold_rent = soup.find("dd", class_="is24qa-kaltmiete grid-item three-fifths").text.strip()
price_per_m2 = soup.find("dd", class_="is24qa-preism² grid-item three-fifths").text\
.replace("Kalkuliert von ImmoScout24", "").strip()
additional_costs = soup.find("dd", class_="is24qa-nebenkosten grid-item three-fifths").text.strip()
heating_costs = soup.find("dd", class_="is24qa-heizkosten grid-item three-fifths").text.strip()
total_cost = soup.find("dd", class_="is24qa-gesamtmiete grid-item three-fifths font-bold").text.strip()
cost_data = CostData(
name=row["name"],
cold_rent=cold_rent,
price_per_m2=price_per_m2,
additional_costs=additional_costs,
total_cost=total_cost
)
costs_pipeline.add_data(cost_data)
costs_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "de"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Time to run everything in production. Once again, feel free to change any of the following.
MAX_THREADS
: Controls the number of threads that the program will use for concurrent execution.MAX_RETRIES
: Defines the number of times the scraper will retry a failed request before giving up.PAGES
: Determines how many pages of Google search results to scrape for each keyword.LOCATION
: Specifies the geographical location (country) for the Google search.keyword_list
: This is a list of keywords for which the script will perform the search and subsequent scraping.
If you need a refresher, here is our main
. We're running a 3 page crawl and then scraping costs on all of the individual results.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "de"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = [{"state": "bayern", "city": "muenchen"}]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = f"{keyword['state']}-{keyword['city']}"
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Here are our results. Our crawl generated a CSV file with 50 results. If you remember earlier, the 3 page crawl took 60.418 seconds. The full process took 318.882 seconds. 318.882 - 60.418 = 258.464 seconds. 258.464 / 50 listings = 5.169 seconds per listing.
Legal and Ethical Considerations
When we scrape the web, we need to be cautious about a few things legally and ethically. Scraping public data on the web is generally considered legal. If it's available publicly, it's fair game. The data we scraped today was public.
Private data (data gate behind a login) is a completely different story. If you choose to scrape private data, you should consult an attorney to determine the legality of your scrape.
We also need to pay attention to both their Terms and Conditions and their robots.txt
. Immobilienscout24 explicitly prohibits scraping. We did violate their policies in this scrape.
Violating terms like these can result in suspension or even a permanent band from the site. You can view links to these policies below.
Conclusion
You now know how to scrape Immobilienscout24. You should have a solid understanding of parsing, pagination, data storage, concurrency, and proxy integration. You've also seen the power of ScrapeOps' bypass
feature first hand.
Take this new knowledge and go build something cool! If you'd like to learn more about the tech used in this article, check out the links below.
More Python Web Scraping Guides
At ScrapeOps, we have tons of resources for everyone to learn from. Whether you're brand new to coding, or you're a seasoned web developer, we have something for you.
Take a look at our Python Web Scraping Playbook. If you want more from our "How To Scrape" series, check out the articles below.