Skip to main content

Scrape Zillow With Python Selenium

How to Scrape Zillow with Selenium

When you're searching for a house in the US, Zillow is one of the most prominent platforms to explore. It provides comprehensive listings for properties, including detailed descriptions, photos, and even virtual tours. However, Zillow is also known for implementing robust anti-scraping measures, making it challenging to extract data using traditional methods.

In this article, we’ll explore how to overcome these challenges using Selenium, which allows us to interact with websites in a way that mimics human behavior, making it more resilient against anti-bot measures.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape Zillow with Selenium

Need to scrape Zillow? We've got you covered.

  1. Create a new project folder.
  2. Inside the folder, create a .env file and add your API key in this format:
SCRAPEOPS_API_KEY=your_api_key_here
  1. Create file e.g main.py and insert this code into it:
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, fields, asdict
import time
from dotenv import load_dotenv
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementException

load_dotenv()

API_KEY = os.getenv("SCRAPEOPS_API_KEY")

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PropertyData:
name: str = ""
price: int = 0
time_on_zillow: str = ""
views: int = 0
saves: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3, timeout=10):
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

options = webdriver.ChromeOptions()
options.add_argument('--headless') # Run in headless mode

for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)

# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)

# Wait for and find script elements
script_elements = WebDriverWait(driver, timeout).until(
EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']"))
)

for script in script_elements:
json_data = json.loads(script.get_attribute('innerHTML'))
if json_data["@type"] != "BreadcrumbList":
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
return # Success, exit the function

except (TimeoutException, WebDriverException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")

raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_property(row, location, retries=3, timeout=10):
url = row["url"]
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

options = webdriver.ChromeOptions()
options.add_argument('--headless')

for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)

# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)

# Extract price
price_element = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']"))
)
price = int(price_element.text.replace("$", "").replace(",", ""))

# Extract other information
info_elements = driver.find_elements(By.TAG_NAME, "dt")
time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed"
views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0
saves = info_elements[4].text if len(info_elements) > 4 else "No saves"

property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")

property_data = PropertyData(
name=row["name"],
price=price,
time_on_zillow=time_listed,
views=views,
saves=saves
)
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()

logger.info(f"Successfully parsed: {url}")
return # Success, exit the function

except (TimeoutException, WebDriverException, NoSuchElementException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")

raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")



def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_property,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
  1. Run the script, and it will generate a CSV file based on your search. For example, searching for properties in "pr" (Puerto Rico) will output a file called pr.csv.

Feel free to change any of the following:

  • MAX_THREADS: Determines the maximum number of threads used for concurrent scraping and processing.
  • MAX_RETRIES: Sets the maximum number of retries for each request in case of failure (e.g., network issues, server errors).
  • PAGES: Specifies the number of pages to scrape for each keyword. Each page contains multiple property listings.
  • LOCATION: Defines the geographical location for the scraping. This parameter is used to adjust the proxy location to simulate requests from a specific country.
  • keyword_list: A list of keywords representing different geographical areas or search terms on Zillow. Each keyword triggers a separate scraping job. ("pr" is Puerto Ricto, if you want to do Michigan, add "mi")

The script then processes the file and generates individual reports for each property listed in pr.csv.


How to Architect Our Zillow Scraper with Selenium

The Zillow scraper we are building will consist of two main components:

  1. Crawler: This component will search for properties in a specific location, gathering relevant data and saving it in a CSV file. Each search will yield results like property names, addresses, and URLs, allowing us to compile a comprehensive dataset.
  2. Parser: Once the crawler completes, the parser will read the CSV file and scrape additional details for each individual property listed, such as price, time on the market, and views.

We will focus on the following key concepts during development:

  • Parsing: Extracting valuable data from Zillow's web pages using Selenium.
  • Pagination: Handling multiple pages of search results to ensure all available listings are collected.
  • Data Storage: Saving the scraped data into CSV reports for further processing and analysis.
  • Concurrency: Speeding up the scraping process by executing multiple tasks (page scrapes) simultaneously using multithreading.
  • Proxy Integration: Overcoming anti-bot measures by routing requests through a proxy service to reduce detection and blocking.

Understanding How to Scrape Zillow with Selenium

In this section, we'll look at how Selenium can help us mimic real browser interactions to retrieve pages, extract data, navigate through pagination, and handle geolocation restrictions.


Step 1: How To Request Zillow Pages

To scrape Zillow effectively, we first need to load the pages. While a traditional GET request (used in the original code with requests) can fetch a page’s HTML, Zillow's anti-bot protections make that approach unreliable. Instead, we'll use Selenium to simulate a human browsing experience.

In the code, the following URL structure represents a search result page for Puerto Rico, with pagination handled by the number at the end:

https://www.zillow.com/pr/2_p/

Zillow Search Page

Here:

  • pr refers to the location (Puerto Rico),
  • 2_p specifies that we are on the second page of results.

Using Selenium, we can open this URL in a browser instance, wait for the page to load, and extract the necessary data, while minimizing the chances of detection.

For the house below, our URL is

https://www.zillow.com/homedetails/459-Carr-Km-7-2-Int-Bo-Arenales-Aguadilla-PR-00603/363559698_zpid/

Zillow Property Page


Step 2: How To Extract Data From Zillow Results and Pages

Extracting data from Zillow using Selenium involves interacting with both JSON data on search result pages and HTML elements on individual property pages.

Let’s break down how we handle each scenario:

Extracting JSON Data from Search Results

On search results pages, Zillow embeds key property data within a JSON structure, which can be extracted using Selenium.

Here is the search page and the JSON blob inside it.

Inspect Zillow Search Results Page

We will wait for all script elements of type application/ld+json, which contain the data we need.

Here’s how we would approach this in our scraper:

# Extract JSON data from search results
script_elements = WebDriverWait(driver, timeout).until(
EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']"))
)

for script in script_elements:
json_data = json.loads(script.get_attribute('innerHTML'))
if json_data["@type"] != "BreadcrumbList":
# Extract relevant fields from JSON
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)

This approach allows us to gather crucial information, such as property names, addresses, and URLs, from the search results.

Extracting HTML Data from Individual Property Pages

On individual property pages, the data is usually buried within the HTML, such as the price, time on Zillow, views, and saves.

Here is a look at some the HTML we want to parse for an individual property page.

Inspect Zillow Property Page

Using Selenium, we can wait for these elements to load and then extract the data using their corresponding CSS selectors or tags:

# Wait for the price element and extract its value
price_element = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']"))
)
price = int(price_element.text.replace("$", "").replace(",", ""))

# Extract other details such as time listed, views, and saves
info_elements = driver.find_elements(By.TAG_NAME, "dt")
time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed"
views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0
saves = info_elements[4].text if len(info_elements) > 4 else "No saves"

This process ensures we retrieve all necessary data fields for each property and store them in the CSV report.


Step 3: How to Control Pagination

Controlling pagination is straightforward, as Zillow URLs follow a predictable pattern. The page number is embedded directly in the URL, for example:

  • Page 1: https://www.zillow.com/pr/1_p/
  • Page 2: https://www.zillow.com/pr/2_p/
  • Page 3: https://www.zillow.com/pr/3_p/

In our scraper, we simply increment the page number to navigate through multiple result pages.

The scrape_search_results function handles pagination by iterating over pages:

def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3, timeout=10):
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
# The rest of the code follows to scrape data from this page

Step 4: Handling Geolocated Data

To ensure our scraper works even when location-based restrictions apply, we use the ScrapeOps API to route requests through servers located in different countries.

When interacting with the ScrapeOps API, we'll pass in a country param as well. country will not have any effect on our actual search results, but instead it will route us through a server in whichever country we specify.

For instance, if we want to appear in the US, we'd pass us in as our country.

This helps bypass Zillow’s geolocation blocks and improves the chances of successful scraping.

The get_scrapeops_url() function integrates this proxy service:

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

Here, the location parameter determines which country we appear to be browsing from (e.g., us for the US).

This doesn’t change the search results but helps us avoid being blocked by Zillow’s anti-scraping mechanisms.


Setting Up Our Zillow Scraper Project

You can run the following commands to get setup.

Create a New Project Folder

mkdir <your_directory_name>
cd <your_directory_name>

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate # Linux
# OR
venv\Scripts\activate # Windows

Install Our Dependencies

pip install selenium python-dotenv

We'll use selenium to automate web browser interactions.

We'll use python-dotenv to securely manage sensitive information like login credentials or API keys by storing them in a separate .env file, which helps keep our main code clean and our secrets safe from accidental exposure.


Build A Zillow Search Crawler

In this section, we'll build a Zillow search crawler by combining several key components:

  • parsing search results,
  • handling pagination,
  • storing data, and
  • adding concurrency.

We’ll also look at how to bypass anti-bot measures to ensure our scraper runs effectively in production.


Step 1: Create a Simple Search Data Parser

Let’s create the core function, scrape_search_results(). This function handles the search result page scraping, extracting necessary details (like property URLs, addresses, and prices) and storing them in a CSV file.

Here’s an outline:

def scrape_search_results(keyword, location, retries=3, timeout=10):
url = f"https://www.zillow.com/{keyword}/"
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

options = webdriver.ChromeOptions()
options.add_argument('--headless')

for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)

# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)

# Wait for and find script elements
script_elements = WebDriverWait(driver, timeout).until(
EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']"))
)

for script in script_elements:
json_data = json.loads(script.get_attribute('innerHTML'))
if json_data["@type"] != "BreadcrumbList":
search_data = {
"name": json_data["name"],
"property_type": json_data["@type"],
"street_address": json_data["address"]["streetAddress"],
"locality": json_data["address"]["addressLocality"],
"region": json_data["address"]["addressRegion"],
"postal_code": json_data["address"]["postalCode"],
"url": json_data["url"]
}
print(search_data)

logger.info(f"Successfully parsed data from: {url}")
return # Success, exit the function

except (TimeoutException, WebDriverException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")

raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")

This function first constructs the Zillow search URL using the location (keyword) and page_number. It sends a GET request to fetch the page, extracts the data, and prints the data.

We've also included a retry mechanism to handle potential errors during scraping.


Step 2: Add Pagination

Pagination is critical for collecting data from multiple pages. Zillow uses page numbers in the URL (_p/) to navigate between search results.

As discussed earlier, we need to increment the page number in our URL.

Here’s how:

def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
  • Keyword: Represents the search location (e.g., city or state).
  • Page: A variable we increment with each loop iteration to move through the search result pages.
  • range(): We use Python’s range() to handle multiple pages of results. Since Zillow pages start at 1 and range() starts at 0, we add +1 to the page number in the URL.

This allows us to scrape multiple pages of Zillow search results by looping through the specified number of pages.


Step 3: Storing the Scraped Data

To store the scraped data, we can create a SearchData class to structure the extracted information, like property address, price, and more.

This ensures that we store data consistently in our CSV file.

@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

We created a SearchData class to structure our scraped property information. This class:

  • Uses Python's @ dataclass decorator for automatic method generation.
  • Defines fields for property details like name, type, address components, and URL.
  • Implements a post-initialization method to check and clean string fields.
  • Sets default values for empty fields and strips whitespace from non-empty ones.

Here is the DataPipeline class that handles saving data to a csv file:

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

We create a DataPipeline class to handle data efficiently and store it in CSV format:

  • We accumulate data in a storage queue until it hits a set limit.
  • The save_to_csv method writes data to a CSV file. It creates new files or appends to existing ones as needed.
  • We use DictWriter for flexible field handling, writing headers for new files.
  • After saving, we clear the queue to prevent data duplication.
  • We implement a flag csv_file_open to avoid concurrent CSV writes.
  • The close_pipeline method saves any leftover data before shutdown.

Step 4: Adding Concurrency

To speed up the scraping process, especially when scraping multiple pages or properties, we can introduce concurrency using Python’s ThreadPoolExecutor.

This allows us to scrape several pages simultaneously, reducing overall runtime.

from concurrent.futures import ThreadPoolExecutor

def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

We implemented a start_scrape function to introduce concurrency:

  • It uses Python's ThreadPoolExecutor to run multiple scraping tasks simultaneously
  • We map the scrape_search_results function across multiple threads
  • This function takes parameters like keyword, pages, location, and data pipeline
  • We control concurrency with the max_threads parameter
  • We include a retry mechanism for resilience against failures

Step 5: Bypassing Anti-Bots

To bypass Zillow’s anti-bot measures, we integrate ScrapeOps Proxy Aggregator by using the get_scrapeops_url() function. This function generates a proxy URL to route our requests through servers in different regions, making it more difficult for Zillow to block our scrapers.

import os
from dotenv import load_dotenv
from urllib.parse import urlencode

load_dotenv()
API_KEY = os.getenv("SCRAPEOPS_API_KEY")

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

print(get_scrapeops_url('https://zillow.com'))

By passing the url and a location parameter (e.g., us), we ensure our requests are routed through a residential proxy, minimizing the risk of getting blocked by Zillow’s anti-bot system.


Step 6: Production Run

Finally, we define the main method to initiate the scraping process.

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

# INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

We set up the main execution flow in the script's entry point:

  • We define constants for retries, thread count, pages to scrape, and location
  • The script accepts a list of keywords to search for
  • We iterate through each keyword:
    • We create a unique filename for each keyword
    • We instantiate a DataPipeline for each keyword
    • We call start_scrape to begin the concurrent scraping process
    • After scraping, we close the pipeline to ensure data is saved
    • We collect filenames for potential aggregation later
  • We use logging to track the start and completion of the crawl process

This structure allows for efficient, concurrent scraping of multiple keywords, with each keyword's data saved to a separate CSV file.

Here is the full code for the Zillow data crawler:

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, fields, asdict
import time
from dotenv import load_dotenv
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException

load_dotenv()

API_KEY = os.getenv("SCRAPEOPS_API_KEY")

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3, timeout=10):
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

options = webdriver.ChromeOptions()
options.add_argument('--headless') # Run in headless mode

for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)

# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)

# Wait for and find script elements
script_elements = WebDriverWait(driver, timeout).until(
EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']"))
)

for script in script_elements:
json_data = json.loads(script.get_attribute('innerHTML'))
if json_data["@type"] != "BreadcrumbList":
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
return # Success, exit the function

except (TimeoutException, WebDriverException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")

raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")



def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

# INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Let's summarize the key steps we took to build this crawler:

  1. Bypassing Anti-Bots: We implemented get_scrapeops_url to use ScrapeOps proxy, allowing us to bypass anti-bot measures.
  2. Simple Search Data Parser: We created scrape_search_results to extract property data from Zillow pages using Selenium.
  3. Pagination: We added start_scrape to handle multiple pages of search results (initially without concurrency).
  4. Data Storage: We developed SearchData class to structure our scraped information and DataPipeline class to manage data storage and CSV writing.
  5. Concurrency: We enhanced the start_scrape function with ThreadPoolExecutor to scrape multiple pages simultaneously, improving efficiency.
  6. Production Setup: We set up the main execution block to handle multiple keywords and manage the overall scraping process.

Build A Zillow Scraper

Our Zillow crawler is already capable of searching for properties, parsing the results, and saving the data in a CSV. Now, we’ll move on to building a scraper that processes this CSV and scrapes individual property details. Our scraper will:

  • Read the CSV file generated by the crawler.
  • Scrape detailed information about each property.
  • Save the property details in a structured format.
  • Implement concurrency for efficiency.
  • Integrate with a proxy to bypass anti-bot protections.

Step 1: Create a Simple Property Data Parser

We’ll begin by creating a function to parse property data from the Zillow property pages. This function will look similar to the initial parsing function we wrote for search results, but now we’re dealing with individual property pages, which have different HTML structures.

Once again, we want to get past anti-bots and anything else that might block us. We will use the same get_scrapeops_url() function:

scrapeops_proxy_url = get_scrapeops_url(url, location=location)

Here’s an example of a basic property data parser:

def process_property(row, location, retries=3, timeout=10):
url = row["url"]
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

options = webdriver.ChromeOptions()
options.add_argument('--headless')

for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)

# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)

# Extract price
price_element = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']"))
)
price = int(price_element.text.replace("$", "").replace(",", ""))

# Extract other information
info_elements = driver.find_elements(By.TAG_NAME, "dt")
time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed"
views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0
saves = info_elements[4].text if len(info_elements) > 4 else "No saves"

property_data = {
'name': row["name"],
'price': price,
'time_on_zillow': time_listed,
'views': views,
'saves': saves
}

print(property_data)

logger.info(f"Successfully parsed: {url}")
return # Success, exit the function

except (TimeoutException, WebDriverException, NoSuchElementException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")

raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")

Key points to note:

  • "span[data-testid='price']" is the CSS selector for the price.
  • int(price_holder.text.replace("$", "").replace(",", "")) cleans up and converts the price into an integer.
  • We extract time_listed, views, and saves from the info_holders list using the correct CSS selectors.

Step 2: Loading URLs to Scrape

To feed property URLs into our parsing function, we’ll load the CSV file generated by the crawler. For each row (property) in the CSV, we’ll call process_property(). Later, we’ll add concurrency to speed things up.

def process_results(csv_file):
with open(csv_file, newline='') as file:
reader = csv.DictReader(file)
for row in reader:
process_property(row['url'])

Here, we’re iterating over the rows in the CSV file and processing each property URL by calling the process_property() function.


Step 3: Storing the Scraped Data

We’ll store the scraped property details in a structured format using a PropertyData class.

This class will be similar to the SearchData class we used earlier but specific to the details scraped from individual property pages.

@dataclass
class PropertyData:
name: str = ""
price: int = 0
time_on_zillow: str = ""
views: int = 0
saves: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

The scraped data is saved in a CSV, where each property is stored as a row. We'll re-use the data pipeline DataPipeline we created in the crawler section as follows:

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


@dataclass
class PropertyData:
name: str = ""
price: int = 0
time_on_zillow: str = ""
views: int = 0
saves: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


def process_property(row, location, retries=3, timeout=10):
url = row["url"]
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

options = webdriver.ChromeOptions()
options.add_argument('--headless')

for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)

# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)

# Extract price
price_element = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']"))
)
price = int(price_element.text.replace("$", "").replace(",", ""))

# Extract other information
info_elements = driver.find_elements(By.TAG_NAME, "dt")
time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed"
views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0
saves = info_elements[4].text if len(info_elements) > 4 else "No saves"

property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")

property_data = PropertyData(
name=row["name"],
price=price,
time_on_zillow=time_listed,
views=views,
saves=saves
)
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()

logger.info(f"Successfully parsed: {url}")
return # Success, exit the function

except (TimeoutException, WebDriverException, NoSuchElementException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")

raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")


Step 4: Adding Concurrency

To scrape multiple properties in parallel, we’ll use Python’s ThreadPoolExecutor. This helps us speed up the process by running multiple process_property() functions concurrently.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_property,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Here, executor.map() handles the parallel processing of multiple property URLs. The process_property() function is called on each URL, and the results are saved concurrently.


Step 5: Production Run

We're now ready to test this thing out in production.

Once again, we've set PAGES to 5 and our LOCATION to "uk". Feel free to change any of the constants within main to tweak your results.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

With that, our full production-ready code is as follows:

from dotenv import load_dotenv
import os
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementException
from dataclasses import fields, asdict, dataclass
import csv
import logging
import time
import concurrent.futures
import json

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()
API_KEY = os.getenv("SCRAPEOPS_API_KEY")


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3, timeout=10):
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

options = webdriver.ChromeOptions()
options.add_argument('--headless') # Run in headless mode

for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)

# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)

# Wait for and find script elements
script_elements = WebDriverWait(driver, timeout).until(
EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']"))
)

for script in script_elements:
json_data = json.loads(script.get_attribute('innerHTML'))
if json_data["@type"] != "BreadcrumbList":
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
return # Success, exit the function

except (TimeoutException, WebDriverException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")

raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")



def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


@dataclass
class PropertyData:
name: str = ""
price: int = 0
time_on_zillow: str = ""
views: int = 0
saves: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


def process_property(row, location, retries=3, timeout=10):
url = row["url"]
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

options = webdriver.ChromeOptions()
options.add_argument('--headless')

for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)

# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)

# Extract price
price_element = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']"))
)
price = int(price_element.text.replace("$", "").replace(",", ""))

# Extract other information
info_elements = driver.find_elements(By.TAG_NAME, "dt")
time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed"
views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0
saves = info_elements[4].text if len(info_elements) > 4 else "No saves"

property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")

property_data = PropertyData(
name=row["name"],
price=price,
time_on_zillow=time_listed,
views=views,
saves=saves
)
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()

logger.info(f"Successfully parsed: {url}")
return # Success, exit the function

except (TimeoutException, WebDriverException, NoSuchElementException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")

raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_property,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")


logger.info(f"Scrape starting...")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

logger.info(f"Scrape complete.")

To see the output, launch the terminal and run the script.

python <your_script_name>.py

What did you notice?

The script creates a file called pr.csv. It then reads this file and creates an individual report on each house.


When utilizing Zillow, it’s important to follow their Terms of Use. In addition, be sure to review their robots.txt file here, which outlines rules for automated access. Not following these guidelines could lead to account suspension or a permanent ban.

Generally, scraping publicly available data is legal in many regions, but accessing private data—such as that which requires login or authentication—requires permission.

If you’re unsure about the legal aspects of your scraping activities, it’s advisable to seek legal advice from an attorney who is familiar with the laws in your area.


Conclusion

You've now completed our tutorial and have added another valuable skill to your scraping toolkit. You’ve learned about parsing, pagination, data storage, concurrency, and proxy integration. Now go ahead and create something awesome!

Interested in the tools covered in this guide? Check out the resources below:


More Python Web Scraping Guides

At ScrapeOps, we have a wealth of resources to help you improve your web scraping skills. Whether you’re just getting started or already a pro, you’ll find something useful.

If you’re looking to dive deeper, check out our Selenium Web Scraping Playbook or explore the following guides: