Skip to main content

Scrape Redfin With Python Requests and BeautifulSoup

How to Scrape Redfin With Requests and BeautifulSoup

If you are a real estate owner, you must have heard about Redfin. It is a great platform that helps you shop for real estate. For developers, it also becomes challenging to scrape the real estate data from Redfin. However, this problem is going to be solved today.

In this tutorial, we will explain how to scrape real estate data from Redfin. It will help us collect all sort of property details. Let’s take a look at what we will cover in the coming sections:

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape Redfin

You don’t have to spend hours reading the below tutorial. This is because you just need to use the pre-built scraper.

  1. First, create a config.json file with your API key {"api_key": "your-super-secret-api-key"}.
  2. Then, add the code given below into a Python file.
import os  
import csv
import json
import logging
import time
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urlencode

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PropertyData:
name: str = ""
bedrooms: int = 0
bathrooms: float = 0.0
square_feet: int = 0
price_differential: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty, set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

# Scrape search results function
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")

tries = 0
success = False

while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(scrapeops_proxy_url)

logger.info("Waiting for page to load...")

# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))

# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")

for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1

finally:
driver.quit() # Ensure the driver is closed after each try

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

# Function to process a single listing using Selenium
def process_listing(driver, row, location, retries):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)

logger.info(f"Processing URL: {url}")

# Wait until the page is fully loaded
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")))

# Extract bedroom information
try:
bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")
bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bedrooms = 0

# Extract bathroom information
try:
bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']")
bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bathrooms = 0.0

# Extract square feet information
try:
size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']")
square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", ""))
except NoSuchElementException:
square_feet = 0

# Extract price differential information
try:
difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']")
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get_attribute("class")
if "diffValue red" in color:
price_differential = -price_number
else:
price_differential = price_number
except NoSuchElementException:
price_differential = 0

# Create a new DataPipeline instance for each property
property_filename = f"{row['name'].replace(' ', '-')}.csv"
property_pipeline = DataPipeline(csv_filename=property_filename)

# Reset names_seen for the new pipeline instance
property_pipeline.names_seen = []

# Create a PropertyData instance
property_data = PropertyData(
name=row["name"],
bedrooms=bedrooms,
bathrooms=bathrooms,
square_feet=square_feet,
price_differential=price_differential
)

# Add property data to the pipeline and save to individual CSV
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()

logger.info(f"Successfully parsed property data: {asdict(property_data)}")
success = True

except TimeoutException:
logger.warning(f"Page load timeout for URL: {url}")
tries += 1
except Exception as e:
logger.error(f"Exception occurred while processing {url}: {e}")
tries += 1
finally:
if tries > retries:
logger.error(f"Max retries reached for URL: {url}")
raise Exception(f"Max retries exceeded for {url}")

def process_results(driver, csv_file, location, max_threads=5, retries=3):
logger.info(f"Processing results from {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
executor.map(
process_listing,
[driver] * len(reader),
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":
start_time = time.time()
MAX_RETRIES = 3
MAX_THREADS = 1
PAGES = 3
LOCATION = "us"

logger.info(f"Crawl starting...")

location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")

logger.info(f"Processing individual listings from CSV...")

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
for file in aggregate_files:
process_results(driver, file, LOCATION, retries=MAX_RETRIES)

driver.quit()
logger.info(f"Crawl complete.")

end_time = time.time() # Record end time
execution_time = end_time - start_time
logger.info(f"Total execution time: {execution_time:.2f} seconds.")

Here is the command to run the Python script given above:

python name_of_your_script.py
  1. First, you'll receive a report named after the city you're scraping.
  2. Then, you'll get a separate CSV file for each listing in your crawl report.

You can modify the following settings to adjust your results:

  • MAX_THREADS: Limits the number of concurrent threads used during scraping.
  • MAX_RETRIES Determines how many times a request will be retried if it fails (e.g., due to a timeout or a 500 error).
  • PAGES Defines how many pages to scrape per location.
  • LOCATION Specifies the geographic region (country) the request originates from.
  • location_list A list of dictionaries containing details for each search area, such as city ID (id_number), state, and locality.
    If you decide to change the location_list, ensure you get the id_number for your specific locality. We have a section that explains how to do this.

How To Architect Our Redfin Scraper

When we scrape Redfin, like with many other of our projects in this "How To Scrape" series, we'll write both a crawler and a scraper.

Our crawler will perform a search on Redfin and our scraper is going to go through and scrape details for all of the properties that we saved during the crawl.

Here is the basic proccess for our crawler:

  1. Perform a search and parse the results.
  2. Control our results with pagination.
  3. Store our data in a CSV file.
  4. Concurrently run steps 1 through 3.
  5. Use proxy integration to get past anti-bots.

After our crawl, our scraper needs to do these tasks:

  1. Read our CSV file.
  2. Parse the data from each individual listing.
  3. Store this newly parsed data to a CSV.
  4. Concurrently run steps 2 and 3.
  5. Once again, utilize proxy integration.

Understanding How To Scrape Redfin

It is important to understand the targeted information before we scrape it. At the same time, we also need to know the location of our data.

  • We can find the data located in a JSON blob on the search pages.
  • Secondly, our desired data on each individual listing page, is located inside the HTML.

Step 1: How To Request Redfin Pages

Let's begin by requesting Redfin pages. As usual, when you retrieve a page, you need to make a GET request.

It's our job to go through this HTML and find the data we need.

The structure of Redfin search page URLs looks like this:

https://www.redfin.com/city/12572/SC/Myrtle-Beach/page-2

The structure goes:

https://www.redfin.com/city/{id_number}/{state}/{city}/page-{page_number}

In this example, the ID number is 12572. The state is South Carolina (SC), and the city is Myrtle Beach. We're on page 2.

Before we start scraping these pages, we need to gather the ID number, state, and city.

You can see all of this in the image below. Make sure to find the location's ID number before you begin scraping.

Search Results Page

When we scrape individual property pages, we end up with URLs that look like this:

https://www.redfin.com/SC/Myrtle-Beach/1501-N-Ocean-Blvd-29577/unit-232/home/170856032

Their layout is:

https://www.redfin.com/{state}/{city}/{address}/unit-{unit_number}/home/{listing_id}

These variables will be much more difficult to recreate. So, instead of rebuilding them, we'll collect them while we do our web crawl.

List Page


Step 2: How To Extract Data From Redfin Results and Pages

As mentioned earlier, we'll be getting data from a JSON file on the search results page, and we'll also pull data directly from the HTML elements on each listing page. Take a look at the images below to get a better understanding of how this works.

Here's an example of a JSON file from the search results.

HTML Inspection Search Results

You can find the bedroom count inside a div that has a data-rf-test-id labeled 'abp-beds'.
HTML Inspection Listing Page


Step 3: How To Control Pagination

Managing our pagination will be simple. Just keep in mind the URL format we talked about earlier.

https://www.redfin.com/city/{id_number}/{state}/{city}/page-{page_number}

The key part of the URL is the end: page-{page_number}.

We'll use page_number+1 because Python's range() starts counting from 0, while our pages start at 1.


Step 4: Geolocated Data

We'll use the ScrapeOps Proxy API to manage our geolocation. This means that when we connect to the ScrapeOps server, we can include a "country" option and it will route us through a server in the country we pick.

For example, if we send {"country": "us"}, it will direct us through a server in the US.

You can check out the full list of supported countries here.


Setting Up Our Redfin Scraper Project

Run the commands given below to get started:

Create a New Project Folder

mkdir redfin-scraper

cd redfin-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install selenium  
pip install webdriver-manager

Build A Redfin Search Crawler

Now that we have a clear plan, let's start building our crawler step by step. Here's what we'll do:

  • Write a simple script with a function to handle parsing.
  • Add pagination so our parser can go through multiple pages.
  • Set up a good way to store the data we collect.
  • Use multithreading to speed things up by running tasks at the same time.
  • Connect to the ScrapeOps Proxy API to get around anti-bot measures.

Step 1: Create Simple Search Data Parser

Let’s kick things off by creating a simple script with a function that can read and understand data.

Our main aim here is straightforward: build a script that includes ways to handle errors, retry when things go wrong, and perform the data reading.

The code below shows how to do this. Pay extra attention to the parsing function because that's where all the cool stuff happens!

import os  
import json
import logging
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def scrape_search_results(search_info, location, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}"
tries = 0
success = False

chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless if needed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

# Use webdriver-manager to automatically manage ChromeDriver
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)

while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Received response from: {url}")

script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.get_attribute("innerText"))
if type(json_data) != list:
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = {
"name": product["name"],
"price": product["offers"]["price"],
"price_currency": product["offers"]["priceCurrency"],
"url": product["url"]
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
tries += 1

driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

# INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

# Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

scrape_search_results(search_area, LOCATION, retries=MAX_RETRIES)
aggregate_files.append(f"{filename}.csv")

logger.info(f"Crawl complete.")

In the code above, here’s what you should focus on:

  • location_list is an array of dictionary objects that we want to explore. We use a dictionary because each place has three important details we need: "id_number," "state," and "locality."
  • Next, we look for all the JSON data hidden in the code by using this CSS selector: script[type='application/ld+json'].
  • We filter out everything that's not a "Product," so we’re left with just our listings to work with.
  • Finally, we grab the "name," "price," "price_currency," and "url" for each product.

Step 2: Add Pagination

Adding pagination to our crawler is really easy! We just need to add one little thing to the end of our URL and write a function to go through a list of pages.

Our URLs will now look like this: page-{page_number+1}. We use page_number+1 because the range() function starts counting from 0, while our pages start at 1.

So, here’s how our new URL format will look:

https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}

In this next part, you'll see start_scrape(), which kicks off our parsing function on a bunch of pages.

def start_scrape(search_info, pages, location, retries=3):  
for page in range(pages):
scrape_search_results(search_info, location, page, retries=retries)

Here is the most updated version of our Python Script:

import os  
import json
import logging
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def scrape_search_results(search_info, location, page_number, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
tries = 0
success = False

chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless if needed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

# Use webdriver-manager to automatically manage ChromeDriver
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)

while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Received response from: {url}")

script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.get_attribute("innerText"))
if type(json_data) != list:
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = {
"name": product["name"],
"price": product["offers"]["price"],
"price_currency": product["offers"]["priceCurrency"],
"url": product["url"]
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
tries += 1

driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def start_scrape(search_info, pages, location, retries=3):
for page in range(pages):
scrape_search_results(search_info, location, page, retries=retries)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"

logger.info(f"Crawl starting...")

# INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

# Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

start_scrape(search_area, PAGES, LOCATION, retries=MAX_RETRIES)
aggregate_files.append(f"{filename}.csv")

logger.info(f"Crawl complete.")
  • Our URLs now have a page number, so we can easily ask for specific pages.
  • The start_scrape() function helps us read through a list of pages.

Step 3: Storing the Scraped Data

Storing data properly is super important for any web scraping project. We need to find a way to represent the things we scrape and also save our data into a CSV file. We'll create two classes: SearchData and DataPipeline.

SearchData will represent each listing we scrape, while DataPipeline will help send these listings to a CSV file. Let’s check out SearchData—it contains all the info we gathered with our parsing function.

@dataclass  
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here’s the DataPipeline we use to save the objects mentioned above in a CSV file.

class DataPipeline:  
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

In the code below, we start by opening a DataPipeline and sending it to the start_scrape() function.

After that, we take all the data we collected and change it into SearchData, which we then send into the DataPipeline.

import os  
import csv
import json
import logging
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")

tries = 0
success = False

while tries <= retries and not success:
try:
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(url)

logger.info("Waiting for page to load...")

# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))

# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")

for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except TimeoutException:
logger.error(f"Timeout while waiting for page: {url}")
tries += 1

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1

finally:
driver.quit() # Ensure the driver is closed after each try

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def start_scrape(search_info, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(search_info, location, page, data_pipeline=data_pipeline, retries=retries)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

We use SearchData to show the actual results from the search. Then, we send these results into our DataPipeline, where they are saved in a CSV file.


Step 4: Adding Concurrency

Next, we want to scrape several pages at the same time. To do this, we’ll use ThreadPoolExecutor, which will take the place of our for loop that goes through the list of pages.

Here’s our updated start_scrape() function.

def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):  
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

Check out the arguments we used in executor.map():

  • scrape_search_results: This is the function we want to run for each thread.
  • search_info: This is an array that matches the number of pages we have.
  • location: This is another array that also matches the number of pages.
  • range(pages): This gives us our list of pages.
  • data_pipeline: This is yet another array that matches our page count.
  • retries: This one is also an array that matches the number of pages.

And here’s the updated code!

import os  
import csv
import json
import logging
import time
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")

tries = 0
success = False

while tries <= retries and not success:
try:
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(url)

logger.info("Waiting for page to load...")

# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))

# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")

for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1

finally:
driver.quit() # Ensure the driver is closed after each try

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")

logger.info(f"Crawl complete.")

We can now crawl a list of pages concurrently.


Step 5: Bypassing Anti-Bots

Anti-bots are tools that help spot and block traffic that isn't from real people. While they mainly aim at harmful software, they often mistakenly block scrapers too. To get around these anti-bots, we need to use a proxy.

To do this, we’ll create a function that takes a regular URL and gives back a URL that goes through the ScrapeOps Proxy API. We’ll send a message to ScrapeOps with the following details:

  • "api_key": your ScrapeOps API key.
  • "url": the URL you want to scrape.
  • "country": the country you want to appear to be in.
  • "wait": how long we want ScrapeOps to pause before sending back the response. This helps the page fully load.
def get_scrapeops_url(url, location="us"):  
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

In the last version of our crawler below, we apply this proxy function to our URL while we're parsing it.

import os  
import csv
import json
import logging
import time
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urlencode

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")

tries = 0
success = False

while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(scrapeops_proxy_url)

logger.info("Waiting for page to load...")

# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))

# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")

for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1

finally:
driver.quit() # Ensure the driver is closed after each try

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")

logger.info(f"Crawl complete.")

Step 6: Production Run

Let's get our crawler up and running for testing! We're going to crawl three pages.

Check out our main settings below, and you can tweak any of these options if you want:

  • MAX_RETRIES
  • MAX_THREADS
  • PAGES
  • LOCATION
  • `location_list
if __name__ == "__main__":  
start_time = time.time() # Start time

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"

logger.info(f"Crawl starting...")

location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")

Our crawl finished in 64.54 seconds. 64.54 seconds / 3 pages = 21.51 seconds per page.


Build A Redfin Scraper

Now that we have a working crawler, we need to create a scraper that actually works.

First, our scraper will read the CSV file made by the crawler. Then, it will go through each listing from our results and scrape them at the same time.


Step 1: Create Simple Listing Data Parser

To gather our listings, we'll start by making a function to parse the data.

Just like before, we'll include some error handling and a way to try again if something goes wrong. Remember to focus on the parsing part, as it's important!

Here is our process_listing() function.

def process_listing(row, location, retries=3):  
url = row["url"]
tries = 0
success = False

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")

while tries <= retries and not success:
try:
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(url)

logger.info("Waiting for page to load...")

# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")))

bedrooms = 0
bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")
if bedroom_holder:
bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))

bathrooms = 0.0
bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']")
if bathroom_holder:
bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))

square_feet = 0
size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']")
if size_holder:
square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", ""))

price_differential = 0
difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']")
if difference_holder:
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get_attribute("class")
if color == "diffValue red":
price_differential = -price_number
else:
price_differential = price_number

property_data = {
"name": row["name"],
"bedrooms": bedrooms,
"bathrooms": bathrooms,
"square_feet": square_feet,
"price_differential": price_differential
}

print(property_data)
success = True
logger.info(f"Successfully parsed: {row['url']}")

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries - tries}")
tries += 1
finally:
driver.quit() # Ensure the driver is closed after each try

if not success:
raise Exception(f"Max Retries exceeded: {retries}")
  • We look for a bedroom section using driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']").
  • If there are bedrooms listed on the page, we grab that info.
  • Next, we do the same for the bathrooms by checking driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']").
  • We also check the size by looking for driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']") and pull the value if it’s there.
  • Finally, we check the price difference with driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']") and get that information too.

Step 2: Loading URLs To Scrape

When we use our parsing function, we need to give it a URL. Our crawler grabs a lot of URLs every time it runs. To get these URLs into our parser, we need to create another function like start_scrape().

We’ll call this new one process_results().

def process_results(csv_file, location, retries=3):  
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

You can check out how everything comes together in the complete code we’ve shared below.

import os  
import csv
import json
import logging
import time
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urlencode

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

# Scrape search results function (unchanged)
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")

tries = 0
success = False

while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(scrapeops_proxy_url)

logger.info("Waiting for page to load...")

# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))

# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")

for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1

finally:
driver.quit() # Ensure the driver is closed after each try

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

# New function to process a single listing using Selenium
def process_listing(driver, row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)

logger.info(f"Processing URL: {url}")

# Wait until the page is fully loaded
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")))

# Extract bedroom information
try:
bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")
bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bedrooms = 0

# Extract bathroom information
try:
bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']")
bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bathrooms = 0.0

# Extract square feet information
try:
size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']")
square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", ""))
except NoSuchElementException:
square_feet = 0

# Extract price differential information
try:
difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']")
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get_attribute("class")
if "diffValue red" in color:
price_differential = -price_number
else:
price_differential = price_number
except NoSuchElementException:
price_differential = 0

# Construct the property data dictionary
property_data = {
"name": row["name"],
"bedrooms": bedrooms,
"bathrooms": bathrooms,
"square_feet": square_feet,
"price_differential": price_differential
}

logger.info(f"Successfully parsed property data: {property_data}")
success = True

except TimeoutException:
logger.warning(f"Page load timeout for URL: {url}")
tries += 1
except Exception as e:
logger.error(f"Exception occurred while processing {url}: {e}")
tries += 1
finally:
if tries > retries:
logger.error(f"Max retries reached for URL: {url}")
raise Exception(f"Max retries exceeded for {url}")

# New function to process the results from a CSV
def process_results(driver, csv_file, location, retries=3):
logger.info(f"Processing results from {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(driver, row, location, retries=retries)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")

logger.info(f"Processing individual listings from CSV...")

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
for file in aggregate_files:
process_results(driver, file, LOCATION, retries=MAX_RETRIES)

driver.quit()
logger.info(f"Crawl complete.")
  • The process_results() function opens our CSV file and then goes through each entry in the file, applying the process_listing() function to them.

Step 3: Storing the Scraped Data

Right now, saving our data is really easy. We just need to create a new data class called PropertyData. It's similar to our SearchData, but it has different fields.

@dataclass  
class PropertyData:
name: str = ""
bedrooms: int = 0
bathrooms: float = 0.0
square_feet: int = 0
price_differential: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty, set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In our complete code, we’re now starting a DataPipeline and sending these new PropertyData objects into it.

import os  
import csv
import json
import logging
import time
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urlencode

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PropertyData:
name: str = ""
bedrooms: int = 0
bathrooms: float = 0.0
square_feet: int = 0
price_differential: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty, set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

# Scrape search results function
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")

tries = 0
success = False

while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(scrapeops_proxy_url)

logger.info("Waiting for page to load...")

# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))

# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")

for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1

finally:
driver.quit() # Ensure the driver is closed after each try

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

# Function to process a single listing using Selenium
def process_listing(driver, row, location, retries):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
try:
driver.get(url)

logger.info(f"Processing URL: {url}")

# Wait until the page is fully loaded
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")))

# Extract bedroom information
try:
bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")
bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bedrooms = 0

# Extract bathroom information
try:
bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']")
bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bathrooms = 0.0

# Extract square feet information
try:
size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']")
square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", ""))
except NoSuchElementException:
square_feet = 0

# Extract price differential information
try:
difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']")
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get_attribute("class")
if "diffValue red" in color:
price_differential = -price_number
else:
price_differential = price_number
except NoSuchElementException:
price_differential = 0

# Create a new DataPipeline instance for each property
property_filename = f"{row['name'].replace(' ', '-')}.csv"
property_pipeline = DataPipeline(csv_filename=property_filename)

# Reset names_seen for the new pipeline instance
property_pipeline.names_seen = []

# Create a PropertyData instance
property_data = PropertyData(
name=row["name"],
bedrooms=bedrooms,
bathrooms=bathrooms,
square_feet=square_feet,
price_differential=price_differential
)

# Add property data to the pipeline and save to individual CSV
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()

logger.info(f"Successfully parsed property data: {asdict(property_data)}")
success = True

except TimeoutException:
logger.warning(f"Page load timeout for URL: {url}")
tries += 1
except Exception as e:
logger.error(f"Exception occurred while processing {url}: {e}")
tries += 1
finally:
if tries > retries:
logger.error(f"Max retries reached for URL: {url}")
raise Exception(f"Max retries exceeded for {url}")

def process_results(driver, csv_file, location, retries):
logger.info(f"Processing results from {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(driver, row, location, retries=retries)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")

logger.info(f"Processing individual listings from CSV...")

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
for file in aggregate_files:
process_results(driver, file, LOCATION, retries=MAX_RETRIES)

driver.quit()
logger.info(f"Crawl complete.")

In our parsing function, we start a new DataPipeline. We add PropertyData objects to this pipeline, making sure each property gets its own separate report.


Step 4: Adding Concurrency

We'll use ThreadPoolExecutor for running tasks at the same time, just like we did before. We just need to change up a for loop a bit.

def process_results(driver, csv_file, location, max_threads=5, retries=3):  
logger.info(f"Processing results from {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
[driver] * len(reader),
reader,
[location] * len(reader),
[retries] * len(reader)
)

The arguments are pretty much the same as before. We have process_listing, which is the function we want to use for each thread that's available.

All the other arguments come in as arrays, and then we send those arrays into process_listing.


Step 5: Bypassing Anti-Bots

We’ve got the tools we need to get around the anti-bot stuff. We just need to use our proxy function where it belongs. We only have to tweak one line in process_listing().

Check out the line below; it’s the key to making it all work!

scrapeops_proxy_url = get_scrapeops_url(url, location=location)

Our complete code is now ready to run in production.

import os  
import csv
import json
import logging
import time
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urlencode

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PropertyData:
name: str = ""
bedrooms: int = 0
bathrooms: float = 0.0
square_feet: int = 0
price_differential: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty, set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

# Scrape search results function
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")

tries = 0
success = False

while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(scrapeops_proxy_url)

logger.info("Waiting for page to load...")

# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))

# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")

for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1

finally:
driver.quit() # Ensure the driver is closed after each try

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

# Function to process a single listing using Selenium
def process_listing(driver, row, location, retries):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)

logger.info(f"Processing URL: {url}")

# Wait until the page is fully loaded
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")))

# Extract bedroom information
try:
bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")
bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bedrooms = 0

# Extract bathroom information
try:
bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']")
bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bathrooms = 0.0

# Extract square feet information
try:
size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']")
square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", ""))
except NoSuchElementException:
square_feet = 0

# Extract price differential information
try:
difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']")
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get_attribute("class")
if "diffValue red" in color:
price_differential = -price_number
else:
price_differential = price_number
except NoSuchElementException:
price_differential = 0

# Create a new DataPipeline instance for each property
property_filename = f"{row['name'].replace(' ', '-')}.csv"
property_pipeline = DataPipeline(csv_filename=property_filename)

# Reset names_seen for the new pipeline instance
property_pipeline.names_seen = []

# Create a PropertyData instance
property_data = PropertyData(
name=row["name"],
bedrooms=bedrooms,
bathrooms=bathrooms,
square_feet=square_feet,
price_differential=price_differential
)

# Add property data to the pipeline and save to individual CSV
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()

logger.info(f"Successfully parsed property data: {asdict(property_data)}")
success = True

except TimeoutException:
logger.warning(f"Page load timeout for URL: {url}")
tries += 1
except Exception as e:
logger.error(f"Exception occurred while processing {url}: {e}")
tries += 1
finally:
if tries > retries:
logger.error(f"Max retries reached for URL: {url}")
raise Exception(f"Max retries exceeded for {url}")

def process_results(driver, csv_file, location, max_threads=5, retries=3):
logger.info(f"Processing results from {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
executor.map(
process_listing,
[driver] * len(reader),
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":
start_time = time.time()
MAX_RETRIES = 3
MAX_THREADS = 1
PAGES = 3
LOCATION = "us"

logger.info(f"Crawl starting...")

location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")

logger.info(f"Processing individual listings from CSV...")

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
for file in aggregate_files:
process_results(driver, file, LOCATION, retries=MAX_RETRIES)

driver.quit()
logger.info(f"Crawl complete.")

end_time = time.time() # Record end time
execution_time = end_time - start_time
logger.info(f"Total execution time: {execution_time:.2f} seconds.")

Step 6: Production Run

Check out our main section. Just like before, we’ll be doing a 3-page crawl.

if __name__ == "__main__":  
start_time = time.time()
MAX_RETRIES = 3
MAX_THREADS = 1
PAGES = 3
LOCATION = "us"

logger.info(f"Crawl starting...")

location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")

logger.info(f"Processing individual listings from CSV...")

options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
for file in aggregate_files:
process_results(driver, file, LOCATION, retries=MAX_RETRIES)

driver.quit()
logger.info(f"Crawl complete.")

end_time = time.time() # Record end time
execution_time = end_time - start_time
logger.info(f"Total execution time: {execution_time:.2f} seconds.")

We generated a report with 120 results.

If you remember earlier, our 3 page crawl took 21.51 seconds. Our full crawl and scrape took 4692.13 seconds. 4692.13 - 21.51 = 4670.62. 4670.62 seconds / 120 results = 38.92 seconds per result.


When scraping a website, you must follow its Terms of Use and robots.txt guidelines.

You can view Redfin's terms here. Their robots.txt is available for review here.

Violating these rules could lead to account suspension or permanent deletion.

In this guide, we only scraped publicly available data.

According to the outcomes of many court cases, scraping public data is pretty much completely legal everywhere. If you're scraping private data (data gated behind a login), that's a completely different story.

If you're unsure of the legality of your scraper, contact an attorney.


Conclusion

You now know how to crawl and scrape Redfin. You've received a crash course in Selenium, and you should have a solid understanding of our iterative build process. You should know how to effectively use JSON and also how to extract data from an HTML page.

If you're interested in any of the tech used in this article, check out these links.


More Python Web Scraping Guides

At ScrapeOps, we wrote the playbook on scraping with Python Selenium. Whether you're brand new to coding, or you're a seasoned developer, we've got something to help take your skills to the next level. Take this new knowledge and go build something.

If you're interested in learning more from our "How To Scrape" series, take a look at the links below.