How to Scrape Redfin With Requests and BeautifulSoup
If you are a real estate owner, you must have heard about Redfin. It is a great platform that helps you shop for real estate. For developers, it also becomes challenging to scrape the real estate data from Redfin. However, this problem is going to be solved today.
In this tutorial, we will explain how to scrape real estate data from Redfin. It will help us collect all sort of property details. Let’s take a look at what we will cover in the coming sections:
- TLDR: How to Scrape Redfin
- How To Architect Our Scraper
- Understanding How To Scrape Redfin
- Setting Up Our Redfin Scraper
- Build A Redfin Search Crawler
- Build A Redfin Scraper
- Legal and Ethical Considerations
- Conclusion
- More Python Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Redfin
You don’t have to spend hours reading the below tutorial. This is because you just need to use the pre-built scraper.
- First, create a
config.json
file with your API key{"api_key": "your-super-secret-api-key"}
. - Then, add the code given below into a Python file.
import os
import csv
import json
import logging
import time
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urlencode
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class PropertyData:
name: str = ""
bedrooms: int = 0
bathrooms: float = 0.0
square_feet: int = 0
price_differential: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty, set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
# Scrape search results function
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")
tries = 0
success = False
while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(scrapeops_proxy_url)
logger.info("Waiting for page to load...")
# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))
# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")
for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue
product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break
search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1
finally:
driver.quit() # Ensure the driver is closed after each try
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
# Function to process a single listing using Selenium
def process_listing(driver, row, location, retries):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Processing URL: {url}")
# Wait until the page is fully loaded
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")))
# Extract bedroom information
try:
bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")
bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bedrooms = 0
# Extract bathroom information
try:
bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']")
bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bathrooms = 0.0
# Extract square feet information
try:
size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']")
square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", ""))
except NoSuchElementException:
square_feet = 0
# Extract price differential information
try:
difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']")
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get_attribute("class")
if "diffValue red" in color:
price_differential = -price_number
else:
price_differential = price_number
except NoSuchElementException:
price_differential = 0
# Create a new DataPipeline instance for each property
property_filename = f"{row['name'].replace(' ', '-')}.csv"
property_pipeline = DataPipeline(csv_filename=property_filename)
# Reset names_seen for the new pipeline instance
property_pipeline.names_seen = []
# Create a PropertyData instance
property_data = PropertyData(
name=row["name"],
bedrooms=bedrooms,
bathrooms=bathrooms,
square_feet=square_feet,
price_differential=price_differential
)
# Add property data to the pipeline and save to individual CSV
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()
logger.info(f"Successfully parsed property data: {asdict(property_data)}")
success = True
except TimeoutException:
logger.warning(f"Page load timeout for URL: {url}")
tries += 1
except Exception as e:
logger.error(f"Exception occurred while processing {url}: {e}")
tries += 1
finally:
if tries > retries:
logger.error(f"Max retries reached for URL: {url}")
raise Exception(f"Max retries exceeded for {url}")
def process_results(driver, csv_file, location, max_threads=5, retries=3):
logger.info(f"Processing results from {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
executor.map(
process_listing,
[driver] * len(reader),
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
start_time = time.time()
MAX_RETRIES = 3
MAX_THREADS = 1
PAGES = 3
LOCATION = "us"
logger.info(f"Crawl starting...")
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Processing individual listings from CSV...")
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
for file in aggregate_files:
process_results(driver, file, LOCATION, retries=MAX_RETRIES)
driver.quit()
logger.info(f"Crawl complete.")
end_time = time.time() # Record end time
execution_time = end_time - start_time
logger.info(f"Total execution time: {execution_time:.2f} seconds.")
Here is the command to run the Python script given above:
python name_of_your_script.py
- First, you'll receive a report named after the city you're scraping.
- Then, you'll get a separate CSV file for each listing in your crawl report.
You can modify the following settings to adjust your results:
MAX_THREADS
: Limits the number of concurrent threads used during scraping.MAX_RETRIES
Determines how many times a request will be retried if it fails (e.g., due to a timeout or a 500 error).PAGES
Defines how many pages to scrape per location.LOCATION
Specifies the geographic region (country) the request originates from.location_list
A list of dictionaries containing details for each search area, such as city ID (id_number), state, and locality.
If you decide to change the location_list, ensure you get the id_number for your specific locality. We have a section that explains how to do this.
How To Architect Our Redfin Scraper
When we scrape Redfin, like with many other of our projects in this "How To Scrape" series, we'll write both a crawler and a scraper.
Our crawler will perform a search on Redfin and our scraper is going to go through and scrape details for all of the properties that we saved during the crawl.
Here is the basic proccess for our crawler:
- Perform a search and parse the results.
- Control our results with pagination.
- Store our data in a CSV file.
- Concurrently run steps 1 through 3.
- Use proxy integration to get past anti-bots.
After our crawl, our scraper needs to do these tasks:
- Read our CSV file.
- Parse the data from each individual listing.
- Store this newly parsed data to a CSV.
- Concurrently run steps 2 and 3.
- Once again, utilize proxy integration.
Understanding How To Scrape Redfin
It is important to understand the targeted information before we scrape it. At the same time, we also need to know the location of our data.
- We can find the data located in a JSON blob on the search pages.
- Secondly, our desired data on each individual listing page, is located inside the HTML.
Step 1: How To Request Redfin Pages
Let's begin by requesting Redfin pages. As usual, when you retrieve a page, you need to make a GET request.
It's our job to go through this HTML and find the data we need.
The structure of Redfin search page URLs looks like this:
https://www.redfin.com/city/12572/SC/Myrtle-Beach/page-2
The structure goes:
https://www.redfin.com/city/{id_number}/{state}/{city}/page-{page_number}
In this example, the ID number is 12572. The state is South Carolina (SC), and the city is Myrtle Beach. We're on page 2.
Before we start scraping these pages, we need to gather the ID number, state, and city.
You can see all of this in the image below. Make sure to find the location's ID number before you begin scraping.
When we scrape individual property pages, we end up with URLs that look like this:
https://www.redfin.com/SC/Myrtle-Beach/1501-N-Ocean-Blvd-29577/unit-232/home/170856032
Their layout is:
https://www.redfin.com/{state}/{city}/{address}/unit-{unit_number}/home/{listing_id}
These variables will be much more difficult to recreate. So, instead of rebuilding them, we'll collect them while we do our web crawl.
Step 2: How To Extract Data From Redfin Results and Pages
As mentioned earlier, we'll be getting data from a JSON file on the search results page, and we'll also pull data directly from the HTML elements on each listing page. Take a look at the images below to get a better understanding of how this works.
Here's an example of a JSON file from the search results.
You can find the bedroom count inside a div that has a data-rf-test-id labeled 'abp-beds'.
Step 3: How To Control Pagination
Managing our pagination will be simple. Just keep in mind the URL format we talked about earlier.
https://www.redfin.com/city/{id_number}/{state}/{city}/page-{page_number}
The key part of the URL is the end: page-{page_number}
.
We'll use page_number+1
because Python's range()
starts counting from 0, while our pages start at 1.
Step 4: Geolocated Data
We'll use the ScrapeOps Proxy API to manage our geolocation. This means that when we connect to the ScrapeOps server, we can include a "country" option and it will route us through a server in the country we pick.
For example, if we send {"country": "us"}
, it will direct us through a server in the US.
You can check out the full list of supported countries here.
Setting Up Our Redfin Scraper Project
Run the commands given below to get started:
Create a New Project Folder
mkdir redfin-scraper
cd redfin-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install selenium
pip install webdriver-manager
Build A Redfin Search Crawler
Now that we have a clear plan, let's start building our crawler step by step. Here's what we'll do:
- Write a simple script with a function to handle parsing.
- Add pagination so our parser can go through multiple pages.
- Set up a good way to store the data we collect.
- Use multithreading to speed things up by running tasks at the same time.
- Connect to the ScrapeOps Proxy API to get around anti-bot measures.
Step 1: Create Simple Search Data Parser
Let’s kick things off by creating a simple script with a function that can read and understand data.
Our main aim here is straightforward: build a script that includes ways to handle errors, retry when things go wrong, and perform the data reading.
The code below shows how to do this. Pay extra attention to the parsing function because that's where all the cool stuff happens!
import os
import json
import logging
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(search_info, location, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}"
tries = 0
success = False
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless if needed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
# Use webdriver-manager to automatically manage ChromeDriver
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Received response from: {url}")
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.get_attribute("innerText"))
if type(json_data) != list:
continue
product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break
search_data = {
"name": product["name"],
"price": product["offers"]["price"],
"price_currency": product["offers"]["priceCurrency"],
"url": product["url"]
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
tries += 1
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []
# Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")
scrape_search_results(search_area, LOCATION, retries=MAX_RETRIES)
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
In the code above, here’s what you should focus on:
- location_list is an array of dictionary objects that we want to explore. We use a dictionary because each place has three important details we need: "id_number," "state," and "locality."
- Next, we look for all the JSON data hidden in the code by using this CSS selector: script[type='application/ld+json'].
- We filter out everything that's not a "Product," so we’re left with just our listings to work with.
- Finally, we grab the "name," "price," "price_currency," and "url" for each product.
Step 2: Add Pagination
Adding pagination to our crawler is really easy! We just need to add one little thing to the end of our URL and write a function to go through a list of pages.
Our URLs will now look like this: page-{page_number+1}
. We use page_number+1
because the range()
function starts counting from 0, while our pages start at 1.
So, here’s how our new URL format will look:
https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}
In this next part, you'll see start_scrape()
, which kicks off our parsing function on a bunch of pages.
def start_scrape(search_info, pages, location, retries=3):
for page in range(pages):
scrape_search_results(search_info, location, page, retries=retries)
Here is the most updated version of our Python Script:
import os
import json
import logging
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(search_info, location, page_number, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
tries = 0
success = False
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless if needed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
# Use webdriver-manager to automatically manage ChromeDriver
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Received response from: {url}")
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.get_attribute("innerText"))
if type(json_data) != list:
continue
product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break
search_data = {
"name": product["name"],
"price": product["offers"]["price"],
"price_currency": product["offers"]["priceCurrency"],
"url": product["url"]
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
tries += 1
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(search_info, pages, location, retries=3):
for page in range(pages):
scrape_search_results(search_info, location, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []
# Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")
start_scrape(search_area, PAGES, LOCATION, retries=MAX_RETRIES)
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- Our URLs now have a page number, so we can easily ask for specific pages.
- The
start_scrape()
function helps us read through a list of pages.
Step 3: Storing the Scraped Data
Storing data properly is super important for any web scraping project. We need to find a way to represent the things we scrape and also save our data into a CSV file. We'll create two classes: SearchData
and DataPipeline
.
SearchData
will represent each listing we scrape, while DataPipeline
will help send these listings to a CSV file. Let’s check out SearchData
—it contains all the info we gathered with our parsing function.
@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here’s the DataPipeline we use to save the objects mentioned above in a CSV file.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
In the code below, we start by opening a DataPipeline and sending it to the start_scrape()
function.
After that, we take all the data we collected and change it into SearchData
, which we then send into the DataPipeline.
import os
import csv
import json
import logging
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(url)
logger.info("Waiting for page to load...")
# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))
# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")
for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue
product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break
search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except TimeoutException:
logger.error(f"Timeout while waiting for page: {url}")
tries += 1
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1
finally:
driver.quit() # Ensure the driver is closed after each try
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(search_info, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(search_info, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
We use SearchData
to show the actual results from the search. Then, we send these results into our DataPipeline
, where they are saved in a CSV file.
Step 4: Adding Concurrency
Next, we want to scrape several pages at the same time. To do this, we’ll use ThreadPoolExecutor
, which will take the place of our for loop that goes through the list of pages.
Here’s our updated start_scrape()
function.
def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
Check out the arguments we used in executor.map()
:
- scrape_search_results: This is the function we want to run for each thread.
- search_info: This is an array that matches the number of pages we have.
- location: This is another array that also matches the number of pages.
- range(pages): This gives us our list of pages.
- data_pipeline: This is yet another array that matches our page count.
- retries: This one is also an array that matches the number of pages.
And here’s the updated code!
import os
import csv
import json
import logging
import time
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(url)
logger.info("Waiting for page to load...")
# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))
# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")
for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue
product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break
search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1
finally:
driver.quit() # Ensure the driver is closed after each try
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
We can now crawl a list of pages concurrently.
Step 5: Bypassing Anti-Bots
Anti-bots are tools that help spot and block traffic that isn't from real people. While they mainly aim at harmful software, they often mistakenly block scrapers too. To get around these anti-bots, we need to use a proxy.
To do this, we’ll create a function that takes a regular URL and gives back a URL that goes through the ScrapeOps Proxy API. We’ll send a message to ScrapeOps with the following details:
- "api_key": your ScrapeOps API key.
- "url": the URL you want to scrape.
- "country": the country you want to appear to be in.
- "wait": how long we want ScrapeOps to pause before sending back the response. This helps the page fully load.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
In the last version of our crawler below, we apply this proxy function to our URL while we're parsing it.
import os
import csv
import json
import logging
import time
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urlencode
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")
tries = 0
success = False
while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(scrapeops_proxy_url)
logger.info("Waiting for page to load...")
# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))
# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")
for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue
product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break
search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1
finally:
driver.quit() # Ensure the driver is closed after each try
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Let's get our crawler up and running for testing! We're going to crawl three pages.
Check out our main settings below, and you can tweak any of these options if you want:
MAX_RETRIES
MAX_THREADS
PAGES
LOCATION
- `location_list
if __name__ == "__main__":
start_time = time.time() # Start time
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
logger.info(f"Crawl starting...")
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
Our crawl finished in 64.54 seconds. 64.54 seconds / 3 pages = 21.51 seconds per page.
Build A Redfin Scraper
Now that we have a working crawler, we need to create a scraper that actually works.
First, our scraper will read the CSV file made by the crawler. Then, it will go through each listing from our results and scrape them at the same time.
Step 1: Create Simple Listing Data Parser
To gather our listings, we'll start by making a function to parse the data.
Just like before, we'll include some error handling and a way to try again if something goes wrong. Remember to focus on the parsing part, as it's important!
Here is our process_listing()
function.
def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")
while tries <= retries and not success:
try:
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(url)
logger.info("Waiting for page to load...")
# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")))
bedrooms = 0
bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")
if bedroom_holder:
bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
bathrooms = 0.0
bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']")
if bathroom_holder:
bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
square_feet = 0
size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']")
if size_holder:
square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", ""))
price_differential = 0
difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']")
if difference_holder:
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get_attribute("class")
if color == "diffValue red":
price_differential = -price_number
else:
price_differential = price_number
property_data = {
"name": row["name"],
"bedrooms": bedrooms,
"bathrooms": bathrooms,
"square_feet": square_feet,
"price_differential": price_differential
}
print(property_data)
success = True
logger.info(f"Successfully parsed: {row['url']}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries - tries}")
tries += 1
finally:
driver.quit() # Ensure the driver is closed after each try
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
- We look for a bedroom section using
driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")
. - If there are bedrooms listed on the page, we grab that info.
- Next, we do the same for the bathrooms by checking
driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']")
. - We also check the size by looking for
driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']")
and pull the value if it’s there. - Finally, we check the price difference with
driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']")
and get that information too.
Step 2: Loading URLs To Scrape
When we use our parsing function, we need to give it a URL. Our crawler grabs a lot of URLs every time it runs. To get these URLs into our parser, we need to create another function like start_scrape()
.
We’ll call this new one process_results()
.
def process_results(csv_file, location, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_listing(row, location, retries=retries)
You can check out how everything comes together in the complete code we’ve shared below.
import os
import csv
import json
import logging
import time
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urlencode
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
# Scrape search results function (unchanged)
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")
tries = 0
success = False
while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(scrapeops_proxy_url)
logger.info("Waiting for page to load...")
# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))
# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")
for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue
product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break
search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1
finally:
driver.quit() # Ensure the driver is closed after each try
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
# New function to process a single listing using Selenium
def process_listing(driver, row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Processing URL: {url}")
# Wait until the page is fully loaded
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")))
# Extract bedroom information
try:
bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")
bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bedrooms = 0
# Extract bathroom information
try:
bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']")
bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bathrooms = 0.0
# Extract square feet information
try:
size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']")
square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", ""))
except NoSuchElementException:
square_feet = 0
# Extract price differential information
try:
difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']")
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get_attribute("class")
if "diffValue red" in color:
price_differential = -price_number
else:
price_differential = price_number
except NoSuchElementException:
price_differential = 0
# Construct the property data dictionary
property_data = {
"name": row["name"],
"bedrooms": bedrooms,
"bathrooms": bathrooms,
"square_feet": square_feet,
"price_differential": price_differential
}
logger.info(f"Successfully parsed property data: {property_data}")
success = True
except TimeoutException:
logger.warning(f"Page load timeout for URL: {url}")
tries += 1
except Exception as e:
logger.error(f"Exception occurred while processing {url}: {e}")
tries += 1
finally:
if tries > retries:
logger.error(f"Max retries reached for URL: {url}")
raise Exception(f"Max retries exceeded for {url}")
# New function to process the results from a CSV
def process_results(driver, csv_file, location, retries=3):
logger.info(f"Processing results from {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_listing(driver, row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Processing individual listings from CSV...")
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
for file in aggregate_files:
process_results(driver, file, LOCATION, retries=MAX_RETRIES)
driver.quit()
logger.info(f"Crawl complete.")
- The
process_results()
function opens our CSV file and then goes through each entry in the file, applying theprocess_listing()
function to them.
Step 3: Storing the Scraped Data
Right now, saving our data is really easy. We just need to create a new data class called PropertyData
. It's similar to our SearchData
, but it has different fields.
@dataclass
class PropertyData:
name: str = ""
bedrooms: int = 0
bathrooms: float = 0.0
square_feet: int = 0
price_differential: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty, set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In our complete code, we’re now starting a DataPipeline and sending these new PropertyData
objects into it.
import os
import csv
import json
import logging
import time
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urlencode
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class PropertyData:
name: str = ""
bedrooms: int = 0
bathrooms: float = 0.0
square_feet: int = 0
price_differential: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty, set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
# Scrape search results function
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")
tries = 0
success = False
while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(scrapeops_proxy_url)
logger.info("Waiting for page to load...")
# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))
# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")
for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue
product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break
search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1
finally:
driver.quit() # Ensure the driver is closed after each try
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
# Function to process a single listing using Selenium
def process_listing(driver, row, location, retries):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Processing URL: {url}")
# Wait until the page is fully loaded
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")))
# Extract bedroom information
try:
bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")
bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bedrooms = 0
# Extract bathroom information
try:
bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']")
bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bathrooms = 0.0
# Extract square feet information
try:
size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']")
square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", ""))
except NoSuchElementException:
square_feet = 0
# Extract price differential information
try:
difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']")
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get_attribute("class")
if "diffValue red" in color:
price_differential = -price_number
else:
price_differential = price_number
except NoSuchElementException:
price_differential = 0
# Create a new DataPipeline instance for each property
property_filename = f"{row['name'].replace(' ', '-')}.csv"
property_pipeline = DataPipeline(csv_filename=property_filename)
# Reset names_seen for the new pipeline instance
property_pipeline.names_seen = []
# Create a PropertyData instance
property_data = PropertyData(
name=row["name"],
bedrooms=bedrooms,
bathrooms=bathrooms,
square_feet=square_feet,
price_differential=price_differential
)
# Add property data to the pipeline and save to individual CSV
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()
logger.info(f"Successfully parsed property data: {asdict(property_data)}")
success = True
except TimeoutException:
logger.warning(f"Page load timeout for URL: {url}")
tries += 1
except Exception as e:
logger.error(f"Exception occurred while processing {url}: {e}")
tries += 1
finally:
if tries > retries:
logger.error(f"Max retries reached for URL: {url}")
raise Exception(f"Max retries exceeded for {url}")
def process_results(driver, csv_file, location, retries):
logger.info(f"Processing results from {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_listing(driver, row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Processing individual listings from CSV...")
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
for file in aggregate_files:
process_results(driver, file, LOCATION, retries=MAX_RETRIES)
driver.quit()
logger.info(f"Crawl complete.")
In our parsing function, we start a new DataPipeline
. We add PropertyData
objects to this pipeline, making sure each property gets its own separate report.
Step 4: Adding Concurrency
We'll use ThreadPoolExecutor
for running tasks at the same time, just like we did before. We just need to change up a for loop a bit.
def process_results(driver, csv_file, location, max_threads=5, retries=3):
logger.info(f"Processing results from {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
[driver] * len(reader),
reader,
[location] * len(reader),
[retries] * len(reader)
)
The arguments are pretty much the same as before. We have process_listing
, which is the function we want to use for each thread that's available.
All the other arguments come in as arrays, and then we send those arrays into process_listing
.
Step 5: Bypassing Anti-Bots
We’ve got the tools we need to get around the anti-bot stuff. We just need to use our proxy function where it belongs. We only have to tweak one line in process_listing()
.
Check out the line below; it’s the key to making it all work!
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
Our complete code is now ready to run in production.
import os
import csv
import json
import logging
import time
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urlencode
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class PropertyData:
name: str = ""
bedrooms: int = 0
bathrooms: float = 0.0
square_feet: int = 0
price_differential: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty, set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
# Scrape search results function
def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36")
tries = 0
success = False
while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(scrapeops_proxy_url)
logger.info("Waiting for page to load...")
# Increase the wait time for the page to load
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']")))
# Once we find the script tag, extract its content
script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']")
if not script_tags:
raise Exception("No script tags found on the page.")
for script in script_tags:
json_data = json.loads(script.get_attribute('innerText'))
if not isinstance(json_data, list):
continue
product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break
search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1
finally:
driver.quit() # Ensure the driver is closed after each try
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
# Function to process a single listing using Selenium
def process_listing(driver, row, location, retries):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
try:
# Use the ScrapeOps proxy URL
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Processing URL: {url}")
# Wait until the page is fully loaded
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")))
# Extract bedroom information
try:
bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")
bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bedrooms = 0
# Extract bathroom information
try:
bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']")
bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0"))
except NoSuchElementException:
bathrooms = 0.0
# Extract square feet information
try:
size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']")
square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", ""))
except NoSuchElementException:
square_feet = 0
# Extract price differential information
try:
difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']")
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get_attribute("class")
if "diffValue red" in color:
price_differential = -price_number
else:
price_differential = price_number
except NoSuchElementException:
price_differential = 0
# Create a new DataPipeline instance for each property
property_filename = f"{row['name'].replace(' ', '-')}.csv"
property_pipeline = DataPipeline(csv_filename=property_filename)
# Reset names_seen for the new pipeline instance
property_pipeline.names_seen = []
# Create a PropertyData instance
property_data = PropertyData(
name=row["name"],
bedrooms=bedrooms,
bathrooms=bathrooms,
square_feet=square_feet,
price_differential=price_differential
)
# Add property data to the pipeline and save to individual CSV
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()
logger.info(f"Successfully parsed property data: {asdict(property_data)}")
success = True
except TimeoutException:
logger.warning(f"Page load timeout for URL: {url}")
tries += 1
except Exception as e:
logger.error(f"Exception occurred while processing {url}: {e}")
tries += 1
finally:
if tries > retries:
logger.error(f"Max retries reached for URL: {url}")
raise Exception(f"Max retries exceeded for {url}")
def process_results(driver, csv_file, location, max_threads=5, retries=3):
logger.info(f"Processing results from {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
executor.map(
process_listing,
[driver] * len(reader),
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
start_time = time.time()
MAX_RETRIES = 3
MAX_THREADS = 1
PAGES = 3
LOCATION = "us"
logger.info(f"Crawl starting...")
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Processing individual listings from CSV...")
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
for file in aggregate_files:
process_results(driver, file, LOCATION, retries=MAX_RETRIES)
driver.quit()
logger.info(f"Crawl complete.")
end_time = time.time() # Record end time
execution_time = end_time - start_time
logger.info(f"Total execution time: {execution_time:.2f} seconds.")
Step 6: Production Run
Check out our main section. Just like before, we’ll be doing a 3-page crawl.
if __name__ == "__main__":
start_time = time.time()
MAX_RETRIES = 3
MAX_THREADS = 1
PAGES = 3
LOCATION = "us"
logger.info(f"Crawl starting...")
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Processing individual listings from CSV...")
options = Options()
options.add_argument("--headless=new") # Use 'new' headless mode for Chrome
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
for file in aggregate_files:
process_results(driver, file, LOCATION, retries=MAX_RETRIES)
driver.quit()
logger.info(f"Crawl complete.")
end_time = time.time() # Record end time
execution_time = end_time - start_time
logger.info(f"Total execution time: {execution_time:.2f} seconds.")
We generated a report with 120 results.
If you remember earlier, our 3 page crawl took 21.51 seconds. Our full crawl and scrape took 4692.13 seconds. 4692.13 - 21.51 = 4670.62. 4670.62 seconds / 120 results = 38.92 seconds per result.
Legal and Ethical Considerations
When scraping a website, you must follow its Terms of Use and robots.txt
guidelines.
You can view Redfin's terms here. Their robots.txt
is available for review here.
Violating these rules could lead to account suspension or permanent deletion.
In this guide, we only scraped publicly available data.
According to the outcomes of many court cases, scraping public data is pretty much completely legal everywhere. If you're scraping private data (data gated behind a login), that's a completely different story.
If you're unsure of the legality of your scraper, contact an attorney.
Conclusion
You now know how to crawl and scrape Redfin. You've received a crash course in Selenium, and you should have a solid understanding of our iterative build process. You should know how to effectively use JSON and also how to extract data from an HTML page.
If you're interested in any of the tech used in this article, check out these links.
More Python Web Scraping Guides
At ScrapeOps, we wrote the playbook on scraping with Python Selenium. Whether you're brand new to coding, or you're a seasoned developer, we've got something to help take your skills to the next level. Take this new knowledge and go build something.
If you're interested in learning more from our "How To Scrape" series, take a look at the links below.