Skip to main content

Scrape Redfin With Python Requests and BeautifulSoup

How to Scrape Redfin With Requests and BeautifulSoup

For the last 20 years, Redfin has been a great place to shop for real estate. When we scrape Redfin, we can get all sorts of property details and collect a pretty large amount of aggregate data.

Throughout this guide, you'll learn how to scrape Redfin.


TLDR - How to Scrape Redfin

Don't have time to read, use our pre-built scraper below! First, create a config.json file with your API key {"api_key": "your-super-secret-api-key"}. Then, copy and paste the code from below into a Python file.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PropertyData:
name: str = ""
bedrooms: int = 0
bathrooms: float = 0.0
square_feet: int = 0
price_differential: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.text)
if type(json_data) != list:
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

bedrooms = 0
bedroom_holder = soup.select_one("div[data-rf-test-id='abp-beds']")
if bedroom_holder:
bedrooms = int(bedroom_holder.find("div", class_="statsValue").text.replace("—", "0"))
bathrooms = 0.0
bathroom_holder = soup.select_one("div[data-rf-test-id='abp-baths']")
if bathroom_holder:
bathrooms = float(bathroom_holder.find("div", class_="statsValue").text.replace("—", "0"))
square_feet = 0
size_holder = soup.select_one("div[data-rf-test-id='abp-sqFt']")
if size_holder:
square_feet = int(size_holder.find("span", class_="statsValue").text.replace(",", ""))
price_differential = 0
difference_holder = soup.select_one("span[data-rf-test-name='avmDiffValue']")
if difference_holder:
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get("class")
if color == "diffValue red":
price_differential = -price_number
else:
price_differential = price_number
property_pipeline = DataPipeline(f"{row['name'].replace(' ', '-')}.csv")
property_data = PropertyData(
name=row["name"],
bedrooms=bedrooms,
bathrooms=bathrooms,
square_feet=square_feet,
price_differential=price_differential
)
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

## Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

You can run this script with the following command:

python name_of_your_script.py

You'll first get a report named after the city you're crawling. Next, you'll get an individual CSV file for every one of listings on your crawl report.

To change your results, feel free to change any of the following:

  • MAX_THREADS: Controls the maximum number of concurrent threads that can be used during scraping.
  • MAX_RETRIES: Sets the maximum number of retries if a request fails (e.g., due to a timeout or a 500 error).
  • PAGES: Specifies the number of pages to scrape for each locality.
  • LOCATION: Indicates the geographic location (country) from which the request is being sent.
  • location_list: A list of dictionaries where each dictionary contains information about a specific search area, including the city ID (id_number), state (state), and locality (locality).

If you choose to change your location_list, make sure to find the id_number for your individual locality. We've got a section on that here.


How To Architect Our Redfin Scraper

When we scrape Redfin, like with many other of our projects in this "How To Scrape" series, we'll write both a crawler and a scraper.

Our crawler will perform a search on Redfin and our scraper is going to go through and scrape details for all of the properties that we saved during the crawl.

Here is the basic proccess for our crawler:

  1. Perform a search and parse the results.
  2. Control our results with pagination.
  3. Store our data in a CSV file.
  4. Concurrently run steps 1 through 3.
  5. Use proxy integration to get past anti-bots.

After our crawl, our scraper needs to do these tasks:

  1. Read our CSV file.
  2. Parse the data from each individual listing.
  3. Store this newly parsed data to a CSV.
  4. Concurrently run steps 2 and 3.
  5. Once again, utilize proxy integration.

Understanding How To Scrape Redfin

Before we build our crawler and and scraper, we need to understand exactly what information we want to scrape and where our data is located.

  • On our search pages, the data is located in a JSON blob on the page.
  • On each individual listing page, the data we're looking for is directly inside the HTML we see on the page.

Step 1: How To Request Redfin Pages

We'll get started by requesting Redfin pages. As always, when you fetch a page, you need to make a GET request. With both our browser and Python Requests, we receive a response back in the form of an HTML page. It's up to us to dig through this HTML and find our data.

Redfin search page URLs are laid out like this:

https://www.redfin.com/city/12572/SC/Myrtle-Beach/page-2

The structure goes:

https://www.redfin.com/city/{id_number}/{state}/{city}/page-{page_number}
  • In this example, 12572 is our id_number.
  • Our state is SC.
  • The city is Myrtle-Beach and our page_number is 2.

Before we scrape these pages, we need to collect the id_number, state and city. You can see all of this in the image below. You need to find the id_number for a location before doing your scrape.

Search Results Page

When we're scraping individual property pages, we get URLs like this:

https://www.redfin.com/SC/Myrtle-Beach/1501-N-Ocean-Blvd-29577/unit-232/home/170856032

Their layout is:

https://www.redfin.com/{state}/{city}/{address}/unit-{unit_number}/home/{listing_id}

These variables are going to be alot harder to reproduce. Instead of reconstructing these, we're going to scrape them during our crawl.

List Page


Step 2: How To Extract Data From Redfin Results and Pages

As mentioned earlier. We'll be extracting data from a JSON blob on the search results page and we'll be pulling our data straight from HTML elements on individual listing pages. Check out the images below to get a better feel for this.

Here is a JSON blob from our search results.

HTML Inspection Search Results

Below, you can see a bedroom count embedded within a div with a data-rf-test-id of 'abp-beds'.

HTML Inspection Listing Page


Step 3: How To Control Pagination

Controlling our pagination is going to be easy. Remember our URL structure from earlier:

https://www.redfin.com/city/{id_number}/{state}/{city}/page-{page_number}

The end of the URL is what's most important: page-{page_number}.

We'll actually be using page_number+1 because Python's range() begins counting at 0, but our pages start at 1.


Step 4: Geolocated Data

To handle our geolocation, we're going to be using the ScrapeOps Proxy API. When we're talking to the ScrapeOps server, we can give it a country param and we'll be routed through a server in the country of our choosing.

If we pass {"country": "us"} into the API, we'll be routed through a server in the US. Our full list of supported countries is available here.


Setting Up Our Redfin Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir redfin-scraper

cd redfin-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A Redfin Search Crawler

Now that we know precisely what we want to do, it's time to build our crawler. We're going to go through and add all the of following with iterative development.

  1. Build a basic script with a parsing function.
  2. Add pagination to our parser.
  3. Create proper storage for our data.
  4. Use multithreading to add concurrency.
  5. Integrate with the ScrapeOps Proxy API in order to bypass anti-bots.

Step 1: Create Simple Search Data Parser

We'll start by setting up a basic script with a parsing function. The goal here is pretty simple: set up a script with error handling, retry logic and a parsing function.

The code below does just that. Give some special attention to our parsing function, this is where the real magic happpens.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



def scrape_search_results(search_info, location, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.text)
if type(json_data) != list:
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = {
"name": product["name"],
"price": product["offers"]["price"],
"price_currency": product["offers"]["priceCurrency"],
"url": product["url"]
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

## Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

scrape_search_results(search_area, LOCATION, retries=MAX_RETRIES)
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

In the code above, you should pay special attention to the following:

  • location_list is an array of dict objects that we wish to crawl. We use a dict because each locality has 3 variables we need: "id_number", "state", and "locality".
  • We then find all the embedded JSON blobs using their CSS selector: script[type='application/ld+json'].
  • We filter out all elements that are not a "Product", this leaves us with only our listings to deal with.
  • We then pull the "name", "price", "price_currency", and "url" from each product.

Step 2: Add Pagination

Adding pagination to our crawler is super simple. We need to add one parameter to the end of our URL, and we need to write a function that runs through a list of pages.

Our URLs will now end like this: page-{page_number+1}. We use page_number+1 because our range() function begins counting at 0 and our pages begin at 1.

Here is our new URL format:

https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}

In this next snippet, you can see start_scrape() which runs our parsing function on a list of pages.

def start_scrape(search_info, pages, location, retries=3):
for page in range(pages):
scrape_search_results(search_info, location, page, retries=retries)

Our full Python script now looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



def scrape_search_results(search_info, location, page_number, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.text)
if type(json_data) != list:
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = {
"name": product["name"],
"price": product["offers"]["price"],
"price_currency": product["offers"]["priceCurrency"],
"url": product["url"]
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(search_info, pages, location, retries=3):
for page in range(pages):
scrape_search_results(search_info, location, page, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

## Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

start_scrape(search_area, PAGES, LOCATION, retries=MAX_RETRIES)
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • Our urls now contain a page_number so we can request specific pages.
  • start_scrape() allows us to parse a list of pages.

Step 3: Storing the Scraped Data

Proper data storage is vital to any scraping project. We need a way to represent the objects we scrape and we also need a way to save our data to a CSV. We're going to make two classes, SearchData and DataPipeline.

SearchData will represent individual listing objects and DataPipeline will pipe these objects to a CSV file.

Take a look at SearchData. It holds all the information we were scraping with our parsing function.

@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is the DataPipeline we use to store the objects above inside a CSV file.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

In our full code below, we open a DataPipeline and pass it into start_scrape(). From the parsing function, we then convert all of our scraped data into SearchData and pass it into the DataPipeline.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.text)
if type(json_data) != list:
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(search_info, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(search_info, location, page, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

## Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • We use SearchData to represent real results from the search.
  • These objects get passed into our DataPipeline for storage inside our CSV file.

Step 4: Adding Concurrency

Next, we need to be able to scrape multiple pages concurrently. To handle this, we're going to use ThreadPoolExecutor. This will replace our for loop that iterates through the list of pages.

Here is our new start_scrape().

def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

Look at our arguments used in executor.map():

  • scrape_search_results: the function we wish to call on each thread.
  • search_info: passed in as an array the length of our page list.
  • location: passed in as an array the length of our page list.
  • range(pages): our list of pages.
  • data_pipeline: passed in as an array the length of our page list.
  • retries: also passed in as an array the length of our page list.

Here is our fully updated code.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.text)
if type(json_data) != list:
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

## Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

We can now crawl a list of pages concurrently.


Step 5: Bypassing Anti-Bots

Anti-bots are designed to detect and block non-human traffic. While they're targeted at malicious software, scrapers tend to trip them up and get blocked. To get past anti-bots, we need to use a proxy.

In order to accomplish this, we're going to write a function that takes in a regular URL and returns a URL that's been routed through the ScrapeOps Proxy API. We'll send a payload to ScrapeOps that looks like this:

  • "api_key": your ScrapeOps API key.
  • "url": the url that you'd like to scrape.
  • "country": the country that we want to appear in.
  • "wait": the amount of time we want ScrapeOps to wait before sending our response back. This allows our page to render.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

In our final iteration of the crawler below, we use this proxy function on our url during the parsing function.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.text)
if type(json_data) != list:
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

## Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

It's time to run our crawler in production and test everything out! We're going to crawl 3 pages. Take a look at our main below and feel free to change any of the following:

  • MAX_RETRIES
  • MAX_THREADS
  • PAGES
  • PAGES
  • LOCATION
  • location_list
if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

## Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

As you can see in the image below, our crawl finished in 13.727 seconds. 13.727 seconds / 3 pages = 4.576 seconds per page.

Crawler Results Terminal


Build A Redfin Scraper

Now that we've got a working crawler, we need to build a functional scraper. Our scraper needs to first read the CSV file generated by the crawler. Then it needs to go through and concurrently scrape each listing from our results crawl.


Step 1: Create Simple Listing Data Parser

To scrape our listings, we'll create a parsing function first. Like earlier, we're going to add parsing, error handling, and retry logic. As before, pay special attention to the parsing logic.

Here is our process_listing() function.

def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

bedrooms = 0
bedroom_holder = soup.select_one("div[data-rf-test-id='abp-beds']")
if bedroom_holder:
bedrooms = int(bedroom_holder.find("div", class_="statsValue").text.replace("—", "0"))
bathrooms = 0.0
bathroom_holder = soup.select_one("div[data-rf-test-id='abp-baths']")
if bathroom_holder:
bathrooms = float(bathroom_holder.find("div", class_="statsValue").text.replace("—", "0"))
square_feet = 0
size_holder = soup.select_one("div[data-rf-test-id='abp-sqFt']")
if size_holder:
square_feet = int(size_holder.find("span", class_="statsValue").text.replace(",", ""))
price_differential = 0
difference_holder = soup.select_one("span[data-rf-test-name='avmDiffValue']")
if difference_holder:
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get("class")
if color == "diffValue red":
price_differential = -price_number
else:
price_differential = price_number
property_data = {
"name": row["name"],
"bedrooms": bedrooms,
"bathrooms": bathrooms,
"square_feet": square_feet,
"price_differential": price_differential
}

print(property_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
  • We check for a bedroom_holder with soup.select_one("div[data-rf-test-id='abp-beds']") and if there are bedrooms present on the page, we extract them.
  • We run the same process on the bathroom_holder, "div[data-rf-test-id='abp-baths']".
  • Check for the size_holder, soup.select_one("div[data-rf-test-id='abp-sqFt']") and extract its value if there is one present.
  • We also do the same with the price difference, soup.select_one("span[data-rf-test-name='avmDiffValue']").

Step 2: Loading URLs To Scrape

When we use our parsing function, it needs a URL. We scrape a boatload of URLs whenever we run our crawler. In order to feed these URLs into our parser, we need to write another function similar to start_scrape(), we'll call this one process_results().

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

You can see how it all fits together in our full code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.text)
if type(json_data) != list:
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

bedrooms = 0
bedroom_holder = soup.select_one("div[data-rf-test-id='abp-beds']")
if bedroom_holder:
bedrooms = int(bedroom_holder.find("div", class_="statsValue").text.replace("—", "0"))
bathrooms = 0.0
bathroom_holder = soup.select_one("div[data-rf-test-id='abp-baths']")
if bathroom_holder:
bathrooms = float(bathroom_holder.find("div", class_="statsValue").text.replace("—", "0"))
square_feet = 0
size_holder = soup.select_one("div[data-rf-test-id='abp-sqFt']")
if size_holder:
square_feet = int(size_holder.find("span", class_="statsValue").text.replace(",", ""))
price_differential = 0
difference_holder = soup.select_one("span[data-rf-test-name='avmDiffValue']")
if difference_holder:
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get("class")
if color == "diffValue red":
price_differential = -price_number
else:
price_differential = price_number
property_data = {
"name": row["name"],
"bedrooms": bedrooms,
"bathrooms": bathrooms,
"square_feet": square_feet,
"price_differential": price_differential
}

print(property_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

## Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
  • process_results() reads our CSV file and then runs process_listing() on each result from the file.

Step 3: Storing the Scraped Data

At this point, storing our data is pretty simple. We just need a new dataclass to work with. We'll call this one PropertyData. It's just like our SearchData, but it holds different fields.

@dataclass
class PropertyData:
name: str = ""
bedrooms: int = 0
bathrooms: float = 0.0
square_feet: int = 0
price_differential: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In our full code, we now open a DataPipeline and pass these new PropertyData objects into it.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PropertyData:
name: str = ""
bedrooms: int = 0
bathrooms: float = 0.0
square_feet: int = 0
price_differential: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.text)
if type(json_data) != list:
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[search_info] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

bedrooms = 0
bedroom_holder = soup.select_one("div[data-rf-test-id='abp-beds']")
if bedroom_holder:
bedrooms = int(bedroom_holder.find("div", class_="statsValue").text.replace("—", "0"))
bathrooms = 0.0
bathroom_holder = soup.select_one("div[data-rf-test-id='abp-baths']")
if bathroom_holder:
bathrooms = float(bathroom_holder.find("div", class_="statsValue").text.replace("—", "0"))
square_feet = 0
size_holder = soup.select_one("div[data-rf-test-id='abp-sqFt']")
if size_holder:
square_feet = int(size_holder.find("span", class_="statsValue").text.replace(",", ""))
price_differential = 0
difference_holder = soup.select_one("span[data-rf-test-name='avmDiffValue']")
if difference_holder:
price_number = int(difference_holder.text.replace(",", ""))
color = difference_holder.get("class")
if color == "diffValue red":
price_differential = -price_number
else:
price_differential = price_number
property_pipeline = DataPipeline(f"{row['name'].replace(' ', '-')}.csv")
property_data = PropertyData(
name=row["name"],
bedrooms=bedrooms,
bathrooms=bathrooms,
square_feet=square_feet,
price_differential=price_differential
)
property_pipeline.add_data(property_data)
property_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}]
aggregate_files = []

## Job Processes
for search_area in location_list:
filename = search_area["locality"].replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

From within our parsing function:

  • We open a new DataPipeline.
  • We pass PropertyData objects into this new pipeline so each property gets its own individual report.

Step 4: Adding Concurrency

We'll use ThreadPoolExecutor for concurrency just like we did before. We just need to refactor a for loop.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)

The arguments are basically the same as before:

  • process_listing: the function we want to call on each available thread.
  • All other arguments get passed in as arrays which then get passed into process_listing.

Step 5: Bypassing Anti-Bots

We already have the functionality needed to get past anti-bots. We just need to call our proxy function in the right place. We only need to change one line from process_listing().

Look at the line below, it holds the key to everything.

response = requests.get(get_scrapeops_url(url, location=location))

Our full project is now ready to run in production.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 3000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SearchData:
name: str = ""
price: int = 0
price_currency: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PropertyData:
name: str = ""
bedrooms: int = 0
bathrooms: float = 0.0
square_feet: int = 0
price_differential: int = 0


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3):
formatted_locality = search_info["locality"].replace(" ", "-")
url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")

script_tags = soup.select("script[type='application/ld+json']")
for script in script_tags:
json_data = json.loads(script.text)
if type(json_data) != list:
continue

product = {}
for element in json_data:
if element["@type"] == "Product":
product = element
break

search_data = SearchData(
name=product["name"],
price=product["offers"]["price"],
price_currency=product["offers"]["priceCurrency"],
url=product["url"]
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: