How to Scrape Zillow with Selenium
When you're searching for a house in the US, Zillow is one of the most prominent platforms to explore. It provides comprehensive listings for properties, including detailed descriptions, photos, and even virtual tours. However, Zillow is also known for implementing robust anti-scraping measures, making it challenging to extract data using traditional methods.
In this article, we’ll explore how to overcome these challenges using Selenium, which allows us to interact with websites in a way that mimics human behavior, making it more resilient against anti-bot measures.
- TLDR: How to Scrape Zillow
- How To Architect Our Scraper
- Understanding How To Scrape zillow
- Setting Up Our Zillow Scraper
- Build A Zillow Search Crawler
- Build A Zillow Scraper
- Legal and Ethical Considerations
- Conclusion
- More Python Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Zillow with Selenium
Need to scrape Zillow? We've got you covered.
- Create a new project folder.
- Inside the folder, create a
.env
file and add your API key in this format:
SCRAPEOPS_API_KEY=your_api_key_here
- Create file e.g
main.py
and insert this code into it:
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, fields, asdict
import time
from dotenv import load_dotenv
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementException
load_dotenv()
API_KEY = os.getenv("SCRAPEOPS_API_KEY")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class PropertyData:
name: str = ""
price: int = 0
time_on_zillow: str = ""
views: int = 0
saves: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3, timeout=10):
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
options = webdriver.ChromeOptions()
options.add_argument('--headless') # Run in headless mode
for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)
# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Wait for and find script elements
script_elements = WebDriverWait(driver, timeout).until(
EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']"))
)
for script in script_elements:
json_data = json.loads(script.get_attribute('innerHTML'))
if json_data["@type"] != "BreadcrumbList":
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
return # Success, exit the function
except (TimeoutException, WebDriverException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")
raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_property(row, location, retries=3, timeout=10):
url = row["url"]
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
options = webdriver.ChromeOptions()
options.add_argument('--headless')
for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)
# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Extract price
price_element = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']"))
)
price = int(price_element.text.replace("$", "").replace(",", ""))
# Extract other information
info_elements = driver.find_elements(By.TAG_NAME, "dt")
time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed"
views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0
saves = info_elements[4].text if len(info_elements) > 4 else "No saves"
property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
property_data = PropertyData(
name=row["name"],
price=price,
time_on_zillow=time_listed,
views=views,
saves=saves
)
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()
logger.info(f"Successfully parsed: {url}")
return # Success, exit the function
except (TimeoutException, WebDriverException, NoSuchElementException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")
raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_property,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
- Run the script, and it will generate a CSV file based on your search. For example, searching for properties in "pr" (Puerto Rico) will output a file called
pr.csv
.
Feel free to change any of the following:
MAX_THREADS
: Determines the maximum number of threads used for concurrent scraping and processing.MAX_RETRIES
: Sets the maximum number of retries for each request in case of failure (e.g., network issues, server errors).PAGES
: Specifies the number of pages to scrape for each keyword. Each page contains multiple property listings.LOCATION
: Defines the geographical location for the scraping. This parameter is used to adjust the proxy location to simulate requests from a specific country.keyword_list
: A list of keywords representing different geographical areas or search terms on Zillow. Each keyword triggers a separate scraping job. ("pr"
is Puerto Ricto, if you want to do Michigan, add"mi"
)
The script then processes the file and generates individual reports for each property listed in pr.csv
.
How to Architect Our Zillow Scraper with Selenium
The Zillow scraper we are building will consist of two main components:
- Crawler: This component will search for properties in a specific location, gathering relevant data and saving it in a CSV file. Each search will yield results like property names, addresses, and URLs, allowing us to compile a comprehensive dataset.
- Parser: Once the crawler completes, the parser will read the CSV file and scrape additional details for each individual property listed, such as price, time on the market, and views.
We will focus on the following key concepts during development:
- Parsing: Extracting valuable data from Zillow's web pages using Selenium.
- Pagination: Handling multiple pages of search results to ensure all available listings are collected.
- Data Storage: Saving the scraped data into CSV reports for further processing and analysis.
- Concurrency: Speeding up the scraping process by executing multiple tasks (page scrapes) simultaneously using multithreading.
- Proxy Integration: Overcoming anti-bot measures by routing requests through a proxy service to reduce detection and blocking.
Understanding How to Scrape Zillow with Selenium
In this section, we'll look at how Selenium can help us mimic real browser interactions to retrieve pages, extract data, navigate through pagination, and handle geolocation restrictions.
Step 1: How To Request Zillow Pages
To scrape Zillow effectively, we first need to load the pages. While a traditional GET request (used in the original code with requests
) can fetch a page’s HTML, Zillow's anti-bot protections make that approach unreliable. Instead, we'll use Selenium to simulate a human browsing experience.
In the code, the following URL structure represents a search result page for Puerto Rico, with pagination handled by the number at the end:
https://www.zillow.com/pr/2_p/
Here:
pr
refers to the location (Puerto Rico),2_p
specifies that we are on the second page of results.
Using Selenium, we can open this URL in a browser instance, wait for the page to load, and extract the necessary data, while minimizing the chances of detection.
For the house below, our URL is
https://www.zillow.com/homedetails/459-Carr-Km-7-2-Int-Bo-Arenales-Aguadilla-PR-00603/363559698_zpid/
Step 2: How To Extract Data From Zillow Results and Pages
Extracting data from Zillow using Selenium involves interacting with both JSON data on search result pages and HTML elements on individual property pages.
Let’s break down how we handle each scenario:
Extracting JSON Data from Search Results
On search results pages, Zillow embeds key property data within a JSON structure, which can be extracted using Selenium.
Here is the search page and the JSON blob inside it.
We will wait for all script
elements of type application/ld+json
, which contain the data we need.
Here’s how we would approach this in our scraper:
# Extract JSON data from search results
script_elements = WebDriverWait(driver, timeout).until(
EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']"))
)
for script in script_elements:
json_data = json.loads(script.get_attribute('innerHTML'))
if json_data["@type"] != "BreadcrumbList":
# Extract relevant fields from JSON
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)
This approach allows us to gather crucial information, such as property names, addresses, and URLs, from the search results.
Extracting HTML Data from Individual Property Pages
On individual property pages, the data is usually buried within the HTML, such as the price, time on Zillow, views, and saves.
Here is a look at some the HTML we want to parse for an individual property page.
Using Selenium, we can wait for these elements to load and then extract the data using their corresponding CSS selectors or tags:
# Wait for the price element and extract its value
price_element = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']"))
)
price = int(price_element.text.replace("$", "").replace(",", ""))
# Extract other details such as time listed, views, and saves
info_elements = driver.find_elements(By.TAG_NAME, "dt")
time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed"
views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0
saves = info_elements[4].text if len(info_elements) > 4 else "No saves"
This process ensures we retrieve all necessary data fields for each property and store them in the CSV report.
Step 3: How to Control Pagination
Controlling pagination is straightforward, as Zillow URLs follow a predictable pattern. The page number is embedded directly in the URL, for example:
- Page 1:
https://www.zillow.com/pr/1_p/
- Page 2:
https://www.zillow.com/pr/2_p/
- Page 3:
https://www.zillow.com/pr/3_p/
In our scraper, we simply increment the page number to navigate through multiple result pages.
The scrape_search_results
function handles pagination by iterating over pages:
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3, timeout=10):
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
# The rest of the code follows to scrape data from this page
Step 4: Handling Geolocated Data
To ensure our scraper works even when location-based restrictions apply, we use the ScrapeOps API to route requests through servers located in different countries.
When interacting with the ScrapeOps API, we'll pass in a country
param as well. country
will not have any effect on our actual search results, but instead it will route us through a server in whichever country we specify.
For instance, if we want to appear in the US, we'd pass us
in as our country.
This helps bypass Zillow’s geolocation blocks and improves the chances of successful scraping.
The get_scrapeops_url()
function integrates this proxy service:
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
Here, the location
parameter determines which country we appear to be browsing from (e.g., us
for the US).
This doesn’t change the search results but helps us avoid being blocked by Zillow’s anti-scraping mechanisms.
Setting Up Our Zillow Scraper Project
You can run the following commands to get setup.
Create a New Project Folder
mkdir <your_directory_name>
cd <your_directory_name>
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate # Linux
# OR
venv\Scripts\activate # Windows
Install Our Dependencies
pip install selenium python-dotenv
We'll use selenium
to automate web browser interactions.
We'll use python-dotenv
to securely manage sensitive information like login credentials or API keys by storing them in a separate .env
file, which helps keep our main code clean and our secrets safe from accidental exposure.
Build A Zillow Search Crawler
In this section, we'll build a Zillow search crawler by combining several key components:
- parsing search results,
- handling pagination,
- storing data, and
- adding concurrency.
We’ll also look at how to bypass anti-bot measures to ensure our scraper runs effectively in production.
Step 1: Create a Simple Search Data Parser
Let’s create the core function, scrape_search_results()
. This function handles the search result page scraping, extracting necessary details (like property URLs, addresses, and prices) and storing them in a CSV file.
Here’s an outline:
def scrape_search_results(keyword, location, retries=3, timeout=10):
url = f"https://www.zillow.com/{keyword}/"
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
options = webdriver.ChromeOptions()
options.add_argument('--headless')
for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)
# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Wait for and find script elements
script_elements = WebDriverWait(driver, timeout).until(
EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']"))
)
for script in script_elements:
json_data = json.loads(script.get_attribute('innerHTML'))
if json_data["@type"] != "BreadcrumbList":
search_data = {
"name": json_data["name"],
"property_type": json_data["@type"],
"street_address": json_data["address"]["streetAddress"],
"locality": json_data["address"]["addressLocality"],
"region": json_data["address"]["addressRegion"],
"postal_code": json_data["address"]["postalCode"],
"url": json_data["url"]
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
return # Success, exit the function
except (TimeoutException, WebDriverException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")
raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")
This function first constructs the Zillow search URL using the location (keyword
) and page_number
. It sends a GET request to fetch the page, extracts the data, and prints the data.
We've also included a retry mechanism to handle potential errors during scraping.
Step 2: Add Pagination
Pagination is critical for collecting data from multiple pages. Zillow uses page numbers in the URL (_p/
) to navigate between search results.
As discussed earlier, we need to increment the page number in our URL.
Here’s how:
def start_scrape(keyword, pages, location, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, retries=retries)
- Keyword: Represents the search location (e.g., city or state).
- Page: A variable we increment with each loop iteration to move through the search result pages.
range()
: We use Python’srange()
to handle multiple pages of results. Since Zillow pages start at 1 andrange()
starts at 0, we add+1
to the page number in the URL.
This allows us to scrape multiple pages of Zillow search results by looping through the specified number of pages.
Step 3: Storing the Scraped Data
To store the scraped data, we can create a SearchData
class to structure the extracted information, like property address, price, and more.
This ensures that we store data consistently in our CSV file.
@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
We created a SearchData class to structure our scraped property information. This class:
- Uses Python's
@ dataclass
decorator for automatic method generation. - Defines fields for property details like name, type, address components, and URL.
- Implements a post-initialization method to check and clean string fields.
- Sets default values for empty fields and strips whitespace from non-empty ones.
Here is the DataPipeline
class that handles saving data to a csv file:
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
We create a DataPipeline
class to handle data efficiently and store it in CSV format:
- We accumulate data in a storage queue until it hits a set limit.
- The
save_to_csv
method writes data to a CSV file. It creates new files or appends to existing ones as needed. - We use
DictWriter
for flexible field handling, writing headers for new files. - After saving, we clear the queue to prevent data duplication.
- We implement a flag
csv_file_open
to avoid concurrent CSV writes. - The
close_pipeline
method saves any leftover data before shutdown.
Step 4: Adding Concurrency
To speed up the scraping process, especially when scraping multiple pages or properties, we can introduce concurrency using Python’s ThreadPoolExecutor
.
This allows us to scrape several pages simultaneously, reducing overall runtime.
from concurrent.futures import ThreadPoolExecutor
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
We implemented a start_scrape function to introduce concurrency:
- It uses Python's
ThreadPoolExecutor
to run multiple scraping tasks simultaneously - We map the
scrape_search_results
function across multiple threads - This function takes parameters like keyword, pages, location, and data pipeline
- We control concurrency with the
max_threads
parameter - We include a retry mechanism for resilience against failures
Step 5: Bypassing Anti-Bots
To bypass Zillow’s anti-bot measures, we integrate ScrapeOps Proxy Aggregator by using the get_scrapeops_url()
function. This function generates a proxy URL to route our requests through servers in different regions, making it more difficult for Zillow to block our scrapers.
import os
from dotenv import load_dotenv
from urllib.parse import urlencode
load_dotenv()
API_KEY = os.getenv("SCRAPEOPS_API_KEY")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
print(get_scrapeops_url('https://zillow.com'))
By passing the url
and a location
parameter (e.g., us
), we ensure our requests are routed through a residential proxy, minimizing the risk of getting blocked by Zillow’s anti-bot system.
Step 6: Production Run
Finally, we define the main
method to initiate the scraping process.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
We set up the main execution flow in the script's entry point:
- We define constants for retries, thread count, pages to scrape, and location
- The script accepts a list of keywords to search for
- We iterate through each keyword:
- We create a unique filename for each keyword
- We instantiate a
DataPipeline
for each keyword - We call
start_scrape
to begin the concurrent scraping process - After scraping, we close the pipeline to ensure data is saved
- We collect filenames for potential aggregation later
- We use logging to track the start and completion of the crawl process
This structure allows for efficient, concurrent scraping of multiple keywords, with each keyword's data saved to a separate CSV file.
Here is the full code for the Zillow data crawler:
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, fields, asdict
import time
from dotenv import load_dotenv
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
load_dotenv()
API_KEY = os.getenv("SCRAPEOPS_API_KEY")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3, timeout=10):
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
options = webdriver.ChromeOptions()
options.add_argument('--headless') # Run in headless mode
for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)
# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Wait for and find script elements
script_elements = WebDriverWait(driver, timeout).until(
EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']"))
)
for script in script_elements:
json_data = json.loads(script.get_attribute('innerHTML'))
if json_data["@type"] != "BreadcrumbList":
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
return # Success, exit the function
except (TimeoutException, WebDriverException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")
raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Let's summarize the key steps we took to build this crawler:
- Bypassing Anti-Bots: We implemented
get_scrapeops_url
to use ScrapeOps proxy, allowing us to bypass anti-bot measures. - Simple Search Data Parser: We created
scrape_search_results
to extract property data from Zillow pages using Selenium. - Pagination: We added
start_scrape
to handle multiple pages of search results (initially without concurrency). - Data Storage: We developed
SearchData
class to structure our scraped information andDataPipeline
class to manage data storage and CSV writing. - Concurrency: We enhanced the
start_scrape
function withThreadPoolExecutor
to scrape multiple pages simultaneously, improving efficiency. - Production Setup: We set up the main execution block to handle multiple keywords and manage the overall scraping process.
Build A Zillow Scraper
Our Zillow crawler is already capable of searching for properties, parsing the results, and saving the data in a CSV. Now, we’ll move on to building a scraper that processes this CSV and scrapes individual property details. Our scraper will:
- Read the CSV file generated by the crawler.
- Scrape detailed information about each property.
- Save the property details in a structured format.
- Implement concurrency for efficiency.
- Integrate with a proxy to bypass anti-bot protections.
Step 1: Create a Simple Property Data Parser
We’ll begin by creating a function to parse property data from the Zillow property pages. This function will look similar to the initial parsing function we wrote for search results, but now we’re dealing with individual property pages, which have different HTML structures.
Once again, we want to get past anti-bots and anything else that might block us. We will use the same get_scrapeops_url()
function:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
Here’s an example of a basic property data parser:
def process_property(row, location, retries=3, timeout=10):
url = row["url"]
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
options = webdriver.ChromeOptions()
options.add_argument('--headless')
for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)
# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Extract price
price_element = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']"))
)
price = int(price_element.text.replace("$", "").replace(",", ""))
# Extract other information
info_elements = driver.find_elements(By.TAG_NAME, "dt")
time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed"
views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0
saves = info_elements[4].text if len(info_elements) > 4 else "No saves"
property_data = {
'name': row["name"],
'price': price,
'time_on_zillow': time_listed,
'views': views,
'saves': saves
}
print(property_data)
logger.info(f"Successfully parsed: {url}")
return # Success, exit the function
except (TimeoutException, WebDriverException, NoSuchElementException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")
raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")
Key points to note:
"span[data-testid='price']"
is the CSS selector for the price.int(price_holder.text.replace("$", "").replace(",", ""))
cleans up and converts the price into an integer.- We extract
time_listed
,views
, andsaves
from theinfo_holders
list using the correct CSS selectors.
Step 2: Loading URLs to Scrape
To feed property URLs into our parsing function, we’ll load the CSV file generated by the crawler. For each row (property) in the CSV, we’ll call process_property()
. Later, we’ll add concurrency to speed things up.
def process_results(csv_file):
with open(csv_file, newline='') as file:
reader = csv.DictReader(file)
for row in reader:
process_property(row['url'])
Here, we’re iterating over the rows in the CSV file and processing each property URL by calling the process_property()
function.
Step 3: Storing the Scraped Data
We’ll store the scraped property details in a structured format using a PropertyData
class.
This class will be similar to the SearchData
class we used earlier but specific to the details scraped from individual property pages.
@dataclass
class PropertyData:
name: str = ""
price: int = 0
time_on_zillow: str = ""
views: int = 0
saves: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
The scraped data is saved in a CSV, where each property is stored as a row. We'll re-use the data pipeline DataPipeline
we created in the crawler section as follows:
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
@dataclass
class PropertyData:
name: str = ""
price: int = 0
time_on_zillow: str = ""
views: int = 0
saves: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
def process_property(row, location, retries=3, timeout=10):
url = row["url"]
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
options = webdriver.ChromeOptions()
options.add_argument('--headless')
for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)
# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Extract price
price_element = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']"))
)
price = int(price_element.text.replace("$", "").replace(",", ""))
# Extract other information
info_elements = driver.find_elements(By.TAG_NAME, "dt")
time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed"
views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0
saves = info_elements[4].text if len(info_elements) > 4 else "No saves"
property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
property_data = PropertyData(
name=row["name"],
price=price,
time_on_zillow=time_listed,
views=views,
saves=saves
)
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()
logger.info(f"Successfully parsed: {url}")
return # Success, exit the function
except (TimeoutException, WebDriverException, NoSuchElementException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")
raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")
Step 4: Adding Concurrency
To scrape multiple properties in parallel, we’ll use Python’s ThreadPoolExecutor
. This helps us speed up the process by running multiple process_property()
functions concurrently.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_property,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Here, executor.map()
handles the parallel processing of multiple property URLs. The process_property()
function is called on each URL, and the results are saved concurrently.
Step 5: Production Run
We're now ready to test this thing out in production.
Once again, we've set PAGES
to 5 and our LOCATION
to "uk"
. Feel free to change any of the constants within main
to tweak your results.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
With that, our full production-ready code is as follows:
from dotenv import load_dotenv
import os
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementException
from dataclasses import fields, asdict, dataclass
import csv
import logging
import time
import concurrent.futures
import json
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
API_KEY = os.getenv("SCRAPEOPS_API_KEY")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
@dataclass
class SearchData:
name: str = ""
property_type: str = ""
street_address: str = ""
locality: str = ""
region: str = ""
postal_code: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3, timeout=10):
url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/"
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
options = webdriver.ChromeOptions()
options.add_argument('--headless') # Run in headless mode
for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)
# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Wait for and find script elements
script_elements = WebDriverWait(driver, timeout).until(
EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']"))
)
for script in script_elements:
json_data = json.loads(script.get_attribute('innerHTML'))
if json_data["@type"] != "BreadcrumbList":
search_data = SearchData(
name=json_data["name"],
property_type=json_data["@type"],
street_address=json_data["address"]["streetAddress"],
locality=json_data["address"]["addressLocality"],
region=json_data["address"]["addressRegion"],
postal_code=json_data["address"]["postalCode"],
url=json_data["url"]
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
return # Success, exit the function
except (TimeoutException, WebDriverException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")
raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
@dataclass
class PropertyData:
name: str = ""
price: int = 0
time_on_zillow: str = ""
views: int = 0
saves: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
def process_property(row, location, retries=3, timeout=10):
url = row["url"]
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
options = webdriver.ChromeOptions()
options.add_argument('--headless')
for attempt in range(retries):
try:
with webdriver.Chrome(options=options) as driver:
driver.get(scrapeops_proxy_url)
# Wait for the body to ensure page has started loading
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Extract price
price_element = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']"))
)
price = int(price_element.text.replace("$", "").replace(",", ""))
# Extract other information
info_elements = driver.find_elements(By.TAG_NAME, "dt")
time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed"
views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0
saves = info_elements[4].text if len(info_elements) > 4 else "No saves"
property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
property_data = PropertyData(
name=row["name"],
price=price,
time_on_zillow=time_listed,
views=views,
saves=saves
)
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()
logger.info(f"Successfully parsed: {url}")
return # Success, exit the function
except (TimeoutException, WebDriverException, NoSuchElementException) as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}")
raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_property,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["pr"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
logger.info(f"Scrape starting...")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info(f"Scrape complete.")
To see the output, launch the terminal and run the script.
python <your_script_name>.py
What did you notice?
The script creates a file called pr.csv
. It then reads this file and creates an individual report on each house.
Legal and Ethical Considerations
When utilizing Zillow, it’s important to follow their Terms of Use. In addition, be sure to review their robots.txt
file here, which outlines rules for automated access. Not following these guidelines could lead to account suspension or a permanent ban.
Generally, scraping publicly available data is legal in many regions, but accessing private data—such as that which requires login or authentication—requires permission.
If you’re unsure about the legal aspects of your scraping activities, it’s advisable to seek legal advice from an attorney who is familiar with the laws in your area.
Conclusion
You've now completed our tutorial and have added another valuable skill to your scraping toolkit. You’ve learned about parsing, pagination, data storage, concurrency, and proxy integration. Now go ahead and create something awesome!
Interested in the tools covered in this guide? Check out the resources below:
More Python Web Scraping Guides
At ScrapeOps, we have a wealth of resources to help you improve your web scraping skills. Whether you’re just getting started or already a pro, you’ll find something useful.
If you’re looking to dive deeper, check out our Selenium Web Scraping Playbook or explore the following guides: