Skip to main content

Scrape Zillow With Python Requests and BeautifulSoup

How to Scrape Zillow With Requests and BeautifulSoup

When you're looking to buy a house in the US, Zillow is perhaps the most well known place to look. On Zillow, we can find listings for houses with all sorts of details about them and we can even take virtual tours of them. However, Zillow is notoriously difficult to scrape because of their use of anti-bots.

In this article, we're going to scrape tons of important data from Zillow.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape Zillow

Need to scrape Zillow? We've got you covered.

  1. Create a new project folder.
  2. Inside that project folder, add a config.json file with your API key and then add this script.
  3. Run it and you're good to go!

First, it will generate CSV file based on your search. If you searched for houses in "pr" (Puerto Rico), it spits out a file called pr.csv.

It then reads this file and creates an individual report on each house from pr.csv.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PropertyData:
name: str = ""
price: int = 0
time_on_zillow: str = ""
views: int = 0
saves: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")

for script_tag in script_tags:
json_data = json.loads(script_tag.text)
if json_data["@type"] != "BreadcrumbList":
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_property(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

price_holder = soup.select_one("span[data-testid='price']")
price = int(price_holder.text.replace("$", "").replace(",", ""))

info_holders = soup.select("dt")
time_listed = info_holders[0].text
views = int(info_holders[2].text.replace(",", ""))
saves = info_holders[4].text

property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")

property_data = PropertyData(
name=row["name"],
price=price,
time_on_zillow=time_listed,
views=views,
saves=saves
)
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_property,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Feel free to change any of the following from main:

  • MAX_THREADS: Determines the maximum number of threads used for concurrent scraping and processing.
  • MAX_RETRIES: Sets the maximum number of retries for each request in case of failure (e.g., network issues, server errors).
  • PAGES: Specifies the number of pages to scrape for each keyword. Each page contains multiple property listings.
  • LOCATION: Defines the geographical location for the scraping. This parameter is used to adjust the proxy location to simulate requests from a specific country.
  • keyword_list: A list of keywords representing different geographical areas or search terms on Zillow. Each keyword triggers a separate scraping job. ("pr" is Puerto Ricto, if you want to do Michigan, add "mi")

How To How To Architect Our Zillow Scraper

Like many other scrapers from our "How To Scrape" series, our Zillow project will consist of two scrapers:

  1. a crawler: Our crawler will perform a search for houses in a specific area and spit out a CSV report on all the properties it finds.

  2. a parser: The scraper will then read the CSV file, and run an individual scrape on each property from the list.

Throughout the development process we'll use the following concepts in our design:

  • Parsing to extract valuable data from webpages.
  • Pagination to get our results in batches.
  • Data Storage to generate CSV reports from that data.
  • Concurrency to handle all the above steps on multiple pages simultaneously.
  • Proxy Integration to get past anti-bot systems and anything else that might get in the way of our scraper.

Understanding How To Scrape Zillow

Before we dive head first into coding, we need to get a better understanding of Zillow at a high level. In the following sections, we're going to discuss how to request pages, how to extract data, how to control pagination, and how to handle geolocation.


Step 1: How To Request Zillow Pages

We can't get very far if we can't request pages. We can request pages on Zillow with a simple GET request. A GET request does exactly what it sounds like, it gets information. When you lookup a site in your browser, you're actually performing a GET request.

Take a look at the image below, specifically our address bar. Here is our URL:

https://www.zillow.com/pr/2_p/

pr is our location.

Zillow Search Page

When we look at a specific home, we get a pop-up about the home information, but no worries, we still get a URL that we can work with.

For the house below, our URL is

https://www.zillow.com/homedetails/459-Carr-Km-7-2-Int-Bo-Arenales-Aguadilla-PR-00603/363559698_zpid/

Building these URLs from scratch would be pretty difficult, luckily for us, we can scrape them on our initial crawl.

Zillow Property Page


Step 2: How To Extract Data From Zillow Results and Pages

Extracting data from Zillow can be a bit tricky. For search results, our data is actually embedded in a JSON blob. For individual property pages, it's nested within the HTML elements. Let's take a closer look at exactly which data we'll be extracting.

Here is the search page and the JSON blob inside it.

Inspect Zillow Search Results Page

Here is a look at some the HTML we want to parse for an individual property page.

Inspect Zillow Property Page


Step 3: How To Control Pagination

Pagination might seem a bit cryptic, but take a closer look at our URL from before:

https://www.zillow.com/pr/2_p/

2_p actually denotes our page number, 2.

If we want to search for page 1, our URL is

https://www.zillow.com/pr/1_p/

If we want page 3, we would end the url with 3_p.


Step 4: Geolocated Data

For geolocated data, we'll be using the ScrapeOps API and our keyword_list. In the keyword_list, we'll hold the locations we'd like to scrape.

When interacting with the ScrapeOps API, we'll pass in a country param as well. country will not have any effect on our actual search results, but instead it will route us through a server in whichever country we specify.

For instance, if we want to appear in the US, we'd pass us in as our country.


Setting Up Our Zillow Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir zillow-scraper

cd zillow-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A Zillow Search Crawler

Step 1: Create Simple Search Data Parser

We'll start off by building a simple parser. The job of the parser is relatively straightforward. Our parser needs to perform a search and extract data from the search results. This is the bedrock of everything else we'll add into our script.

While the script below adds some basic structure (logging, error handling, retry logic), what you should pay attention to is the parsing logic inside of our scrape_search_results() function.

Take a look at the script so far.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.zillow.com/{keyword}/"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")

for script_tag in script_tags:
json_data = json.loads(script_tag.text)
if json_data["@type"] != "BreadcrumbList":
search_data = {
"name": json_data["name"],
"property_type": json_data["@type"],
"street_address": json_data["address"]["streetAddress"],
"locality": json_data["address"]["addressLocality"],
"region": json_data["address"]["addressRegion"],
"postal_code": json_data["address"]["postalCode"],
"url": json_data["url"]
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

scrape_search_results(keyword, LOCATION, retries=retries)
logger.info(f"Crawl complete.")

In our parsing function, we do the following things.

  • Find all of our JSON blobs with script_tags = soup.select("script[type='application/ld+json']")
  • Iterate through the blobs.
  • For each blob that does not have a "@type" of "BreadcrumbList", we parse its data:
    • name
    • property_type
    • street_address
    • locality
    • region
    • postal_code
    • url

Step 2: Add Pagination

As we discussed earlier, we can add pagination with the following string by constructin our URL like so:

https://www.zillow.com/{keyword}/{page_number+1}_p/
  • As you know, keyword is the location we'd like to search.
  • {page_number+1}_p, denotes our page number. We use page_number+1 because we'll be using Python's range() function to create our page list.
  • range() starts counting at zero and Zillow starts our pages at 1. So, we add 1 to our page when we pass it into the URL.

We also added a start_scrape() function to support the pagination we just added.

def start_scrape(keyword, pages, location, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)

Here is our updated code. We added a start_scrape() function to support multiple pages, but all in all, our code isn't all that different.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")

for script_tag in script_tags:
json_data = json.loads(script_tag.text)
if json_data["@type"] != "BreadcrumbList":
search_data = {
"name": json_data["name"],
"property_type": json_data["@type"],
"street_address": json_data["address"]["streetAddress"],
"locality": json_data["address"]["addressLocality"],
"region": json_data["address"]["addressRegion"],
"postal_code": json_data["address"]["postalCode"],
"url": json_data["url"]
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

start_scrape(keyword, PAGES, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")

Step 3: Storing the Scraped Data

When we scrape our data, we need to store it. If we didn't, there wouldn't be a point in scraping it to begin with. When we store our data, we can review it and we can also allow other functions or programs to read it.

This lays the groundwork for our scraper, which will read the stored CSV file and then lookup individual data about the properties from the CSV file.

First, we need a SearchData class. This class simply holds data.

@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is our DataPipeline. It takes in a dataclass (such as SearchData) and pipes it to a CSV file. This pipeline then filters out our duplicates and then saves the data to a CSV file.

Additionally, our pipeline writes the file safely. If the CSV exists, we append it, otherwise the pipeline creates it.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

Here is our code now that it's been fully updated for storage.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")

for script_tag in script_tags:
json_data = json.loads(script_tag.text)
if json_data["@type"] != "BreadcrumbList":
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 4: Adding Concurrency

To add concurrency, we'll use ThreadPoolExecutor. We'll also add a max_threads argument to start_scrape(). Take a look at the snippet below.

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

Pay close attention to the arguments we pass into executor.map():

  • scrape_search_results is the function we'd like to run on each thread.
  • All other arguments are the arguments that we pass into scrape_search_results()
  • Each argument after the first gets passed in as an array.

Here is our full code as of now.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")

for script_tag in script_tags:
json_data = json.loads(script_tag.text)
if json_data["@type"] != "BreadcrumbList":
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 5: Bypassing Anti-Bots

When we scrape the web, we tend to run into anti-bots. Anti-bots are software designed to detect malware and block it from accessing a site. Our scraper isn't malware, but it doesn't look human at all and anti-bots tend to flag this.

Take a look at the snippet below, this unleashes the power of the ScrapeOps Proxy API.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

Arguments we pass into the ScrapeOps API:

  • "api_key": our ScrapeOps API key.
  • "url": the url of the site we'd like to scrape.
  • "country": the country we'd like to be routed through.
  • "residential": a boolean value. If we set this to True, we're telling ScrapeOps to give us a residential IP address which decreases our likelihood of getting blocked.

Here is our production ready code.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")

excluded_types = ["BreadcrumbList", "Event"]
for script_tag in script_tags:
json_data = json.loads(script_tag.text)

if json_data["@type"] not in excluded_types:
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

We're now ready for a production run. Take a look at our main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

We'll change PAGES to 5, and LOCATION to "us". Feel free to change any of these constants in the main to tweak your results.

Here are our results.

Crawler Performance Results

Our crawler parsed the results at approximately one second per page. This is great!


Build A Zillow Scraper

Our crawler is now performing searches, parsing results and saving the data. Now, it's time to build the scraper. Our scraper is going to do the following:

  • Read a CSV file.
  • Parse all properties from the CSV file.
  • Save their data.
  • Perform the actions above concurrently.
  • Integrate with a proxy.

Step 1: Create Simple Business Data Parser

Like before, we'll start with a basic parsing function. The overall structure looks alot like our first parsing function, but there are some key differences.

Mainly, instead of finding JSON data nested within the page, we're going to find our data from different elements on the page.

Take a look at the function below.

def process_property(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

price_holder = soup.select_one("span[data-testid='price']")
price = int(price_holder.text.replace("$", "").replace(",", ""))

info_holders = soup.select("dt")
time_listed = info_holders[0].text
views = int(info_holders[2].text.replace(",", ""))
saves = info_holders[4].text

property_data = {
"name": row["name"],
"price": price,
"time_on_zillow": time_listed,
"views": views,
"saves": saves
}

print(property_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")

Things you need to pay attention to here:

  • "span[data-testid='price']" is the CSS selector of our price_holder.
  • int(price_holder.text.replace("$", "").replace(",", "")) gives us our actual price and converts it to an integer.
  • We find all of our info_holders with soup.select("dt")
  • We pull time_listed, views, and saves from the info_holders array.

Step 2: Loading URLs To Scrape

In order to use our parsing function, we need to be able to feed urls into it. for each row in the file, we run process_property() on that row. Later, we'll add concurrency to this function just like we did eariler.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_property(row, location, retries=retries)

Here is our fully updated code.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")

excluded_types = ["BreadcrumbList", "Event"]
for script_tag in script_tags:
json_data = json.loads(script_tag.text)
if json_data["@type"] not in excluded_types:
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_property(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

price_holder = soup.select_one("span[data-testid='price']")
price = int(price_holder.text.replace("$", "").replace(",", ""))

info_holders = soup.select("dt")
time_listed = info_holders[0].text
views = int(info_holders[2].text.replace(",", ""))
saves = info_holders[4].text

property_data = {
"name": row["name"],
"price": price,
"time_on_zillow": time_listed,
"views": views,
"saves": saves
}

print(property_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_property(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Our code now reads the CSV file generated by the crawler and runs process_post() on each row from the file.


Step 3: Storing the Scraped Data

As you already know at this point, we need to store the data we scrape. For the most part, we've already got everything in place to do this.

We simply need to add a PropertyData class. This class will act much like the SearchData class from before and it will also get passed into a DataPipeline.

Here is our SearchData class.

@dataclass
class PropertyData:
name: str = ""
price: int = 0
time_on_zillow: str = ""
views: int = 0
saves: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In our full code below, instead of printing the data, we instantiate a PropertyData object and then pass it into a DataPipeline.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PropertyData:
name: str = ""
price: int = 0
time_on_zillow: str = ""
views: int = 0
saves: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")
excluded_types = ["BreadcrumbList", "Event"]
for script_tag in script_tags:
json_data = json.loads(script_tag.text)
if json_data["@type"] not in excluded_types:
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_property(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

price_holder = soup.select_one("span[data-testid='price']")
price = int(price_holder.text.replace("$", "").replace(",", ""))

info_holders = soup.select("dt")
time_listed = info_holders[0].text
views = int(info_holders[2].text.replace(",", ""))
saves = info_holders[4].text

property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")

property_data = PropertyData(
name=row["name"],
price=price,
time_on_zillow=time_listed,
views=views,
saves=saves
)
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_property(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 4: Adding Concurrency

With a small refactor, we can now add concurrency. Once again, we'll replace our for loop with ThreadPoolExecutor. Take a look at the new function.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_property,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Quite similar to when we added concurrency earlier, executor.map() takes the following arguments.

  • process_property is the function we want to run on each thread.
  • reader is the array of property from our CSV file.
  • All other arguments are passed in as arrays, just like before.

Step 5: Bypassing Anti-Bots

Once again, we want to get past anti-bots and anything else that might block us. We only need to change one line.

response = requests.get(get_scrapeops_url(url, location=location))

Here is our finalized code, ready for production.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PropertyData:
name: str = ""
price: int = 0
time_on_zillow: str = ""
views: int = 0
saves: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")
excluded_types = ["BreadcrumbList", "Events"]
for script_tag in script_tags:
json_data = json.loads(script_tag.text)
if json_data["@type"] not in excluded_types:
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_property(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

price_holder = soup.select_one("span[data-testid='price']")
price = int(price_holder.text.replace("$", "").replace(",", ""))

info_holders = soup.select("dt")
time_listed = info_holders[0].text
views = int(info_holders[2].text.replace(",", ""))
saves = info_holders[4].text

property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")

property_data = PropertyData(
name=row["name"],
price=price,
time_on_zillow=time_listed,
views=views,
saves=saves
)
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.