Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your API key {"api_key": "your-super-secret-api-key"}
. Then, copy and paste the code from below into a Python file.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass PropertyData: name: str = "" bedrooms: int = 0 bathrooms: float = 0.0 square_feet: int = 0 price_differential: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script in script_tags: json_data = json.loads(script.text) if type(json_data) != list: continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") bedrooms = 0 bedroom_holder = soup.select_one("div[data-rf-test-id='abp-beds']") if bedroom_holder: bedrooms = int(bedroom_holder.find("div", class_="statsValue").text.replace("—", "0")) bathrooms = 0.0 bathroom_holder = soup.select_one("div[data-rf-test-id='abp-baths']") if bathroom_holder: bathrooms = float(bathroom_holder.find("div", class_="statsValue").text.replace("—", "0")) square_feet = 0 size_holder = soup.select_one("div[data-rf-test-id='abp-sqFt']") if size_holder: square_feet = int(size_holder.find("span", class_="statsValue").text.replace(",", "")) price_differential = 0 difference_holder = soup.select_one("span[data-rf-test-name='avmDiffValue']") if difference_holder: price_number = int(difference_holder.text.replace(",", "")) color = difference_holder.get("class") if color == "diffValue red": price_differential = -price_number else: price_differential = price_number property_pipeline = DataPipeline(f"{row['name'].replace(' ', '-')}.csv") property_data = PropertyData( name=row["name"], bedrooms=bedrooms, bathrooms=bathrooms, square_feet=square_feet, price_differential=price_differential ) property_pipeline.add_data(property_data) property_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] ## Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
python name_of_your_script.py
MAX_THREADS
: Controls the maximum number of concurrent threads that can be used during scraping.MAX_RETRIES
: Sets the maximum number of retries if a request fails (e.g., due to a timeout or a 500 error).PAGES
: Specifies the number of pages to scrape for each locality.LOCATION
: Indicates the geographic location (country) from which the request is being sent.location_list
: A list of dictionaries where each dictionary contains information about a specific search area, including the city ID (id_number), state (state), and locality (locality).location_list
, make sure to find the id_number
for your individual locality. We've got a section on that here.https://www.redfin.com/city/12572/SC/Myrtle-Beach/page-2
https://www.redfin.com/city/{id_number}/{state}/{city}/page-{page_number}
12572
is our id_number
.SC
.Myrtle-Beach
and our page_number
is 2
.id_number
, state
and city
. You can see all of this in the image below. You need to find the id_number
for a location before doing your scrape.https://www.redfin.com/SC/Myrtle-Beach/1501-N-Ocean-Blvd-29577/unit-232/home/170856032
https://www.redfin.com/{state}/{city}/{address}/unit-{unit_number}/home/{listing_id}
div
with a data-rf-test-id
of 'abp-beds'
.https://www.redfin.com/city/{id_number}/{state}/{city}/page-{page_number}
page-{page_number}
.We'll actually be using page_number+1
because Python's range()
begins counting at 0, but our pages start at 1.country
param and we'll be routed through a server in the country of our choosing.If we pass {"country": "us"}
into the API, we'll be routed through a server in the US. Our full list of supported countries is available here.mkdir redfin-scraper cd redfin-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(search_info, location, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script in script_tags: json_data = json.loads(script.text) if type(json_data) != list: continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = { "name": product["name"], "price": product["offers"]["price"], "price_currency": product["offers"]["priceCurrency"], "url": product["url"] } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] ## Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") scrape_search_results(search_area, LOCATION, retries=MAX_RETRIES) aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
location_list
is an array of dict
objects that we wish to crawl. We use a dict
because each locality has 3 variables we need: "id_number"
, "state"
, and "locality"
.script[type='application/ld+json']
."Product"
, this leaves us with only our listings to deal with."name"
, "price"
, "price_currency"
, and "url"
from each product.page-{page_number+1}
. We use page_number+1
because our range()
function begins counting at 0 and our pages begin at 1.Here is our new URL format:https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}
start_scrape()
which runs our parsing function on a list of pages.def start_scrape(search_info, pages, location, retries=3): for page in range(pages): scrape_search_results(search_info, location, page, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(search_info, location, page_number, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script in script_tags: json_data = json.loads(script.text) if type(json_data) != list: continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = { "name": product["name"], "price": product["offers"]["price"], "price_currency": product["offers"]["priceCurrency"], "url": product["url"] } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, retries=3): for page in range(pages): scrape_search_results(search_info, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] ## Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") start_scrape(search_area, PAGES, LOCATION, retries=MAX_RETRIES) aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
page_number
so we can request specific pages.start_scrape()
allows us to parse a list of pages.SearchData
and DataPipeline
.SearchData
will represent individual listing objects and DataPipeline
will pipe these objects to a CSV file.Take a look at SearchData
. It holds all the information we were scraping with our parsing function.@dataclassclass SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
we use to store the objects above inside a CSV file.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
DataPipeline
and pass it into start_scrape()
. From the parsing function, we then convert all of our scraped data into SearchData
and pass it into the DataPipeline
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script in script_tags: json_data = json.loads(script.text) if type(json_data) != list: continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(search_info, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] ## Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData
to represent real results from the search.DataPipeline
for storage inside our CSV file.ThreadPoolExecutor
. This will replace our for
loop that iterates through the list of pages.Here is our new start_scrape()
.def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map()
:scrape_search_results
: the function we wish to call on each thread.search_info
: passed in as an array the length of our page list.location
: passed in as an array the length of our page list.range(pages)
: our list of pages.data_pipeline
: passed in as an array the length of our page list.retries
: also passed in as an array the length of our page list.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script in script_tags: json_data = json.loads(script.text) if type(json_data) != list: continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] ## Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
payload
to ScrapeOps that looks like this:"api_key"
: your ScrapeOps API key."url"
: the url that you'd like to scrape."country"
: the country that we want to appear in."wait"
: the amount of time we want ScrapeOps to wait before sending our response back. This allows our page to render.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script in script_tags: json_data = json.loads(script.text) if type(json_data) != list: continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] ## Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
below and feel free to change any of the following:MAX_RETRIES
MAX_THREADS
PAGES
PAGES
LOCATION
location_list
if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] ## Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
process_listing()
function.def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") bedrooms = 0 bedroom_holder = soup.select_one("div[data-rf-test-id='abp-beds']") if bedroom_holder: bedrooms = int(bedroom_holder.find("div", class_="statsValue").text.replace("—", "0")) bathrooms = 0.0 bathroom_holder = soup.select_one("div[data-rf-test-id='abp-baths']") if bathroom_holder: bathrooms = float(bathroom_holder.find("div", class_="statsValue").text.replace("—", "0")) square_feet = 0 size_holder = soup.select_one("div[data-rf-test-id='abp-sqFt']") if size_holder: square_feet = int(size_holder.find("span", class_="statsValue").text.replace(",", "")) price_differential = 0 difference_holder = soup.select_one("span[data-rf-test-name='avmDiffValue']") if difference_holder: price_number = int(difference_holder.text.replace(",", "")) color = difference_holder.get("class") if color == "diffValue red": price_differential = -price_number else: price_differential = price_number property_data = { "name": row["name"], "bedrooms": bedrooms, "bathrooms": bathrooms, "square_feet": square_feet, "price_differential": price_differential } print(property_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
bedroom_holder
with soup.select_one("div[data-rf-test-id='abp-beds']")
and if there are bedrooms present on the page, we extract them.bathroom_holder
, "div[data-rf-test-id='abp-baths']"
.size_holder
, soup.select_one("div[data-rf-test-id='abp-sqFt']")
and extract its value if there is one present.soup.select_one("span[data-rf-test-name='avmDiffValue']")
.start_scrape()
, we'll call this one process_results()
.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script in script_tags: json_data = json.loads(script.text) if type(json_data) != list: continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") bedrooms = 0 bedroom_holder = soup.select_one("div[data-rf-test-id='abp-beds']") if bedroom_holder: bedrooms = int(bedroom_holder.find("div", class_="statsValue").text.replace("—", "0")) bathrooms = 0.0 bathroom_holder = soup.select_one("div[data-rf-test-id='abp-baths']") if bathroom_holder: bathrooms = float(bathroom_holder.find("div", class_="statsValue").text.replace("—", "0")) square_feet = 0 size_holder = soup.select_one("div[data-rf-test-id='abp-sqFt']") if size_holder: square_feet = int(size_holder.find("span", class_="statsValue").text.replace(",", "")) price_differential = 0 difference_holder = soup.select_one("span[data-rf-test-name='avmDiffValue']") if difference_holder: price_number = int(difference_holder.text.replace(",", "")) color = difference_holder.get("class") if color == "diffValue red": price_differential = -price_number else: price_differential = price_number property_data = { "name": row["name"], "bedrooms": bedrooms, "bathrooms": bathrooms, "square_feet": square_feet, "price_differential": price_differential } print(property_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] ## Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
process_results()
reads our CSV file and then runs process_listing()
on each result from the file.dataclass
to work with. We'll call this one PropertyData
. It's just like our SearchData
, but it holds different fields.@dataclassclass PropertyData: name: str = "" bedrooms: int = 0 bathrooms: float = 0.0 square_feet: int = 0 price_differential: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
and pass these new PropertyData
objects into it.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass PropertyData: name: str = "" bedrooms: int = 0 bathrooms: float = 0.0 square_feet: int = 0 price_differential: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script in script_tags: json_data = json.loads(script.text) if type(json_data) != list: continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") bedrooms = 0 bedroom_holder = soup.select_one("div[data-rf-test-id='abp-beds']") if bedroom_holder: bedrooms = int(bedroom_holder.find("div", class_="statsValue").text.replace("—", "0")) bathrooms = 0.0 bathroom_holder = soup.select_one("div[data-rf-test-id='abp-baths']") if bathroom_holder: bathrooms = float(bathroom_holder.find("div", class_="statsValue").text.replace("—", "0")) square_feet = 0 size_holder = soup.select_one("div[data-rf-test-id='abp-sqFt']") if size_holder: square_feet = int(size_holder.find("span", class_="statsValue").text.replace(",", "")) price_differential = 0 difference_holder = soup.select_one("span[data-rf-test-name='avmDiffValue']") if difference_holder: price_number = int(difference_holder.text.replace(",", "")) color = difference_holder.get("class") if color == "diffValue red": price_differential = -price_number else: price_differential = price_number property_pipeline = DataPipeline(f"{row['name'].replace(' ', '-')}.csv") property_data = PropertyData( name=row["name"], bedrooms=bedrooms, bathrooms=bathrooms, square_feet=square_feet, price_differential=price_differential ) property_pipeline.add_data(property_data) property_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] ## Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline
.PropertyData
objects into this new pipeline so each property gets its own individual report.ThreadPoolExecutor
for concurrency just like we did before. We just need to refactor a for
loop.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, reader, [location] * len(reader), [retries] * len(reader) )
process_listing
: the function we want to call on each available thread.process_listing
.process_listing()
.Look at the line below, it holds the key to everything.response = requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass PropertyData: name: str = "" bedrooms: int = 0 bathrooms: float = 0.0 square_feet: int = 0 price_differential: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script in script_tags: json_data = json.loads(script.text) if type(json_data) != list: continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") bedrooms = 0 bedroom_holder = soup.select_one("div[data-rf-test-id='abp-beds']") if bedroom_holder: bedrooms = int(bedroom_holder.find("div", class_="statsValue").text.replace("—", "0")) bathrooms = 0.0 bathroom_holder = soup.select_one("div[data-rf-test-id='abp-baths']") if bathroom_holder: bathrooms = float(bathroom_holder.find("div", class_="statsValue").text.replace("—", "0")) square_feet = 0 size_holder = soup.select_one("div[data-rf-test-id='abp-sqFt']") if size_holder: square_feet = int(size_holder.find("span", class_="statsValue").text.replace(",", "")) price_differential = 0 difference_holder = soup.select_one("span[data-rf-test-name='avmDiffValue']") if difference_holder: price_number = int(difference_holder.text.replace(",", "")) color = difference_holder.get("class") if color == "diffValue red": price_differential = -price_number else: price_differential = price_number property_pipeline = DataPipeline(f"{row['name'].replace(' ', '-')}.csv") property_data = PropertyData( name=row["name"], bedrooms=bedrooms, bathrooms=bathrooms, square_feet=square_feet, price_differential=price_differential ) property_pipeline.add_data(property_data) property_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] ## Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
. As before, we'll be running a 3 pages crawl.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] ## Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
.You can view Redfin's terms here. Their robots.txt
is available for review here. Violating any of these policies could result in either suspension or even permanent deletion of your account.Today, we scraped publicly available data. According to the outcomes of many court cases, scraping public data is pretty much completely legal everywhere. If you're scraping private data (data gated behind a login), that's a completely different story.If you're unsure of the legality of your scraper, contact an attorney.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your API key {"api_key": "your-super-secret-api-key"}
.import os import csv import json import logging import time import concurrent.futures from dataclasses import dataclass, field, fields, asdict from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from webdriver_manager.chrome import ChromeDriverManager from urllib.parse import urlencode API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class PropertyData: name: str = "" bedrooms: int = 0 bathrooms: float = 0.0 square_feet: int = 0 price_differential: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty, set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() # Scrape search results function def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36") tries = 0 success = False while tries <= retries and not success: try: # Use the ScrapeOps proxy URL scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) driver.get(scrapeops_proxy_url) logger.info("Waiting for page to load...") # Increase the wait time for the page to load WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']"))) # Once we find the script tag, extract its content script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']") if not script_tags: raise Exception("No script tags found on the page.") for script in script_tags: json_data = json.loads(script.get_attribute('innerText')) if not isinstance(json_data, list): continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") tries += 1 finally: driver.quit() # Ensure the driver is closed after each try if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) # Function to process a single listing using Selenium def process_listing(driver, row, location, retries): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: # Use the ScrapeOps proxy URL scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Processing URL: {url}") # Wait until the page is fully loaded WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']"))) # Extract bedroom information try: bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']") bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0")) except NoSuchElementException: bedrooms = 0 # Extract bathroom information try: bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']") bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0")) except NoSuchElementException: bathrooms = 0.0 # Extract square feet information try: size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']") square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", "")) except NoSuchElementException: square_feet = 0 # Extract price differential information try: difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']") price_number = int(difference_holder.text.replace(",", "")) color = difference_holder.get_attribute("class") if "diffValue red" in color: price_differential = -price_number else: price_differential = price_number except NoSuchElementException: price_differential = 0 # Create a new DataPipeline instance for each property property_filename = f"{row['name'].replace(' ', '-')}.csv" property_pipeline = DataPipeline(csv_filename=property_filename) # Reset names_seen for the new pipeline instance property_pipeline.names_seen = [] # Create a PropertyData instance property_data = PropertyData( name=row["name"], bedrooms=bedrooms, bathrooms=bathrooms, square_feet=square_feet, price_differential=price_differential ) # Add property data to the pipeline and save to individual CSV property_pipeline.add_data(property_data) property_pipeline.close_pipeline() logger.info(f"Successfully parsed property data: {asdict(property_data)}") success = True except TimeoutException: logger.warning(f"Page load timeout for URL: {url}") tries += 1 except Exception as e: logger.error(f"Exception occurred while processing {url}: {e}") tries += 1 finally: if tries > retries: logger.error(f"Max retries reached for URL: {url}") raise Exception(f"Max retries exceeded for {url}") def process_results(driver, csv_file, location, max_threads=5, retries=3): logger.info(f"Processing results from {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: executor.map( process_listing, [driver] * len(reader), reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": start_time = time.time() MAX_RETRIES = 3 MAX_THREADS = 1 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Processing individual listings from CSV...") options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) for file in aggregate_files: process_results(driver, file, LOCATION, retries=MAX_RETRIES) driver.quit() logger.info(f"Crawl complete.") end_time = time.time() # Record end time execution_time = end_time - start_time logger.info(f"Total execution time: {execution_time:.2f} seconds.")
python name_of_your_script.py
MAX_THREADS
: Limits the number of concurrent threads used during scraping.MAX_RETRIES
Determines how many times a request will be retried if it fails (e.g., due to a timeout or a 500 error).PAGES
Defines how many pages to scrape per location.LOCATION
Specifies the geographic region (country) the request originates from.location_list
A list of dictionaries containing details for each search area, such as city ID (id_number), state, and locality.https://www.redfin.com/city/12572/SC/Myrtle-Beach/page-2
https://www.redfin.com/city/{id_number}/{state}/{city}/page-{page_number}
https://www.redfin.com/SC/Myrtle-Beach/1501-N-Ocean-Blvd-29577/unit-232/home/170856032
https://www.redfin.com/{state}/{city}/{address}/unit-{unit_number}/home/{listing_id}
https://www.redfin.com/city/{id_number}/{state}/{city}/page-{page_number}
page-{page_number}
.We'll use page_number+1
because Python's range()
starts counting from 0, while our pages start at 1.{"country": "us"}
, it will direct us through a server in the US.You can check out the full list of supported countries here.mkdir redfin-scraper cd redfin-scraper
python -m venv venv
source venv/bin/activate
pip install selenium pip install webdriver-manager
import os import json import logging from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures from dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def scrape_search_results(search_info, location, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}" tries = 0 success = False chrome_options = Options() chrome_options.add_argument("--headless") # Run headless if needed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") # Use webdriver-manager to automatically manage ChromeDriver service = Service(ChromeDriverManager().install()) driver = webdriver.Chrome(service=service, options=chrome_options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Received response from: {url}") script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']") for script in script_tags: json_data = json.loads(script.get_attribute("innerText")) if type(json_data) != list: continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = { "name": product["name"], "price": product["offers"]["price"], "price_currency": product["offers"]["priceCurrency"], "url": product["url"] } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") tries += 1 driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] # Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") scrape_search_results(search_area, LOCATION, retries=MAX_RETRIES) aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
page-{page_number+1}
. We use page_number+1
because the range()
function starts counting from 0, while our pages start at 1.So, here’s how our new URL format will look:https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}
start_scrape()
, which kicks off our parsing function on a bunch of pages.def start_scrape(search_info, pages, location, retries=3): for page in range(pages): scrape_search_results(search_info, location, page, retries=retries)
import os import json import logging from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from webdriver_manager.chrome import ChromeDriverManager from dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def scrape_search_results(search_info, location, page_number, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" tries = 0 success = False chrome_options = Options() chrome_options.add_argument("--headless") # Run headless if needed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") # Use webdriver-manager to automatically manage ChromeDriver service = Service(ChromeDriverManager().install()) driver = webdriver.Chrome(service=service, options=chrome_options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Received response from: {url}") script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']") for script in script_tags: json_data = json.loads(script.get_attribute("innerText")) if type(json_data) != list: continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = { "name": product["name"], "price": product["offers"]["price"], "price_currency": product["offers"]["priceCurrency"], "url": product["url"] } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") tries += 1 driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, retries=3): for page in range(pages): scrape_search_results(search_info, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] # Job Processes for search_area in location_list: filename = search_area["locality"].replace(" ", "-") start_scrape(search_area, PAGES, LOCATION, retries=MAX_RETRIES) aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
start_scrape()
function helps us read through a list of pages.SearchData
and DataPipeline
.SearchData
will represent each listing we scrape, while DataPipeline
will help send these listings to a CSV file. Let’s check out SearchData
—it contains all the info we gathered with our parsing function.@dataclass class SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip())
class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
start_scrape()
function.After that, we take all the data we collected and change it into SearchData
, which we then send into the DataPipeline.import os import csv import json import logging from dataclasses import dataclass, field, fields, asdict from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36") tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) driver.get(url) logger.info("Waiting for page to load...") # Increase the wait time for the page to load WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']"))) # Once we find the script tag, extract its content script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']") if not script_tags: raise Exception("No script tags found on the page.") for script in script_tags: json_data = json.loads(script.get_attribute('innerText')) if not isinstance(json_data, list): continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except TimeoutException: logger.error(f"Timeout while waiting for page: {url}") tries += 1 except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") tries += 1 finally: driver.quit() # Ensure the driver is closed after each try if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(search_info, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData
to show the actual results from the search. Then, we send these results into our DataPipeline
, where they are saved in a CSV file.ThreadPoolExecutor
, which will take the place of our for loop that goes through the list of pages.Here’s our updated start_scrape()
function.def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map()
:import os import csv import json import logging import time import concurrent.futures from dataclasses import dataclass, field, fields, asdict from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36") tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) driver.get(url) logger.info("Waiting for page to load...") # Increase the wait time for the page to load WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']"))) # Once we find the script tag, extract its content script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']") if not script_tags: raise Exception("No script tags found on the page.") for script in script_tags: json_data = json.loads(script.get_attribute('innerText')) if not isinstance(json_data, list): continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") tries += 1 finally: driver.quit() # Ensure the driver is closed after each try if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import os import csv import json import logging import time import concurrent.futures from dataclasses import dataclass, field, fields, asdict from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager from urllib.parse import urlencode API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36") tries = 0 success = False while tries <= retries and not success: try: # Use the ScrapeOps proxy URL scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) driver.get(scrapeops_proxy_url) logger.info("Waiting for page to load...") # Increase the wait time for the page to load WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']"))) # Once we find the script tag, extract its content script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']") if not script_tags: raise Exception("No script tags found on the page.") for script in script_tags: json_data = json.loads(script.get_attribute('innerText')) if not isinstance(json_data, list): continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") tries += 1 finally: driver.quit() # Ensure the driver is closed after each try if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
MAX_RETRIES
MAX_THREADS
PAGES
LOCATION
if __name__ == "__main__": start_time = time.time() # Start time MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv")
process_listing()
function.def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36") while tries <= retries and not success: try: driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) driver.get(url) logger.info("Waiting for page to load...") # Increase the wait time for the page to load WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']"))) bedrooms = 0 bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']") if bedroom_holder: bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0")) bathrooms = 0.0 bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']") if bathroom_holder: bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0")) square_feet = 0 size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']") if size_holder: square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", "")) price_differential = 0 difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']") if difference_holder: price_number = int(difference_holder.text.replace(",", "")) color = difference_holder.get_attribute("class") if color == "diffValue red": price_differential = -price_number else: price_differential = price_number property_data = { "name": row["name"], "bedrooms": bedrooms, "bathrooms": bathrooms, "square_feet": square_feet, "price_differential": price_differential } print(property_data) success = True logger.info(f"Successfully parsed: {row['url']}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, Retries left: {retries - tries}") tries += 1 finally: driver.quit() # Ensure the driver is closed after each try if not success: raise Exception(f"Max Retries exceeded: {retries}")
driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']")
.driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']")
.driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']")
and pull the value if it’s there.driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']")
and get that information too.start_scrape()
.We’ll call this new one process_results()
.def process_results(csv_file, location, retries=3): logger.info(f"Processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries)
import os import csv import json import logging import time import concurrent.futures from dataclasses import dataclass, field, fields, asdict from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from webdriver_manager.chrome import ChromeDriverManager from urllib.parse import urlencode API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() # Scrape search results function (unchanged) def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36") tries = 0 success = False while tries <= retries and not success: try: # Use the ScrapeOps proxy URL scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) driver.get(scrapeops_proxy_url) logger.info("Waiting for page to load...") # Increase the wait time for the page to load WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']"))) # Once we find the script tag, extract its content script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']") if not script_tags: raise Exception("No script tags found on the page.") for script in script_tags: json_data = json.loads(script.get_attribute('innerText')) if not isinstance(json_data, list): continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") tries += 1 finally: driver.quit() # Ensure the driver is closed after each try if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) # New function to process a single listing using Selenium def process_listing(driver, row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: # Use the ScrapeOps proxy URL scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Processing URL: {url}") # Wait until the page is fully loaded WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']"))) # Extract bedroom information try: bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']") bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0")) except NoSuchElementException: bedrooms = 0 # Extract bathroom information try: bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']") bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0")) except NoSuchElementException: bathrooms = 0.0 # Extract square feet information try: size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']") square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", "")) except NoSuchElementException: square_feet = 0 # Extract price differential information try: difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']") price_number = int(difference_holder.text.replace(",", "")) color = difference_holder.get_attribute("class") if "diffValue red" in color: price_differential = -price_number else: price_differential = price_number except NoSuchElementException: price_differential = 0 # Construct the property data dictionary property_data = { "name": row["name"], "bedrooms": bedrooms, "bathrooms": bathrooms, "square_feet": square_feet, "price_differential": price_differential } logger.info(f"Successfully parsed property data: {property_data}") success = True except TimeoutException: logger.warning(f"Page load timeout for URL: {url}") tries += 1 except Exception as e: logger.error(f"Exception occurred while processing {url}: {e}") tries += 1 finally: if tries > retries: logger.error(f"Max retries reached for URL: {url}") raise Exception(f"Max retries exceeded for {url}") # New function to process the results from a CSV def process_results(driver, csv_file, location, retries=3): logger.info(f"Processing results from {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(driver, row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Processing individual listings from CSV...") options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) for file in aggregate_files: process_results(driver, file, LOCATION, retries=MAX_RETRIES) driver.quit() logger.info(f"Crawl complete.")
process_results()
function opens our CSV file and then goes through each entry in the file, applying the process_listing()
function to them.PropertyData
. It's similar to our SearchData
, but it has different fields.@dataclass class PropertyData: name: str = "" bedrooms: int = 0 bathrooms: float = 0.0 square_feet: int = 0 price_differential: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty, set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
PropertyData
objects into it.import os import csv import json import logging import time import concurrent.futures from dataclasses import dataclass, field, fields, asdict from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from webdriver_manager.chrome import ChromeDriverManager from urllib.parse import urlencode API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class PropertyData: name: str = "" bedrooms: int = 0 bathrooms: float = 0.0 square_feet: int = 0 price_differential: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty, set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() # Scrape search results function def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36") tries = 0 success = False while tries <= retries and not success: try: # Use the ScrapeOps proxy URL scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) driver.get(scrapeops_proxy_url) logger.info("Waiting for page to load...") # Increase the wait time for the page to load WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']"))) # Once we find the script tag, extract its content script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']") if not script_tags: raise Exception("No script tags found on the page.") for script in script_tags: json_data = json.loads(script.get_attribute('innerText')) if not isinstance(json_data, list): continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") tries += 1 finally: driver.quit() # Ensure the driver is closed after each try if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) # Function to process a single listing using Selenium def process_listing(driver, row, location, retries): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver.get(url) logger.info(f"Processing URL: {url}") # Wait until the page is fully loaded WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']"))) # Extract bedroom information try: bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']") bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0")) except NoSuchElementException: bedrooms = 0 # Extract bathroom information try: bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']") bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0")) except NoSuchElementException: bathrooms = 0.0 # Extract square feet information try: size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']") square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", "")) except NoSuchElementException: square_feet = 0 # Extract price differential information try: difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']") price_number = int(difference_holder.text.replace(",", "")) color = difference_holder.get_attribute("class") if "diffValue red" in color: price_differential = -price_number else: price_differential = price_number except NoSuchElementException: price_differential = 0 # Create a new DataPipeline instance for each property property_filename = f"{row['name'].replace(' ', '-')}.csv" property_pipeline = DataPipeline(csv_filename=property_filename) # Reset names_seen for the new pipeline instance property_pipeline.names_seen = [] # Create a PropertyData instance property_data = PropertyData( name=row["name"], bedrooms=bedrooms, bathrooms=bathrooms, square_feet=square_feet, price_differential=price_differential ) # Add property data to the pipeline and save to individual CSV property_pipeline.add_data(property_data) property_pipeline.close_pipeline() logger.info(f"Successfully parsed property data: {asdict(property_data)}") success = True except TimeoutException: logger.warning(f"Page load timeout for URL: {url}") tries += 1 except Exception as e: logger.error(f"Exception occurred while processing {url}: {e}") tries += 1 finally: if tries > retries: logger.error(f"Max retries reached for URL: {url}") raise Exception(f"Max retries exceeded for {url}") def process_results(driver, csv_file, location, retries): logger.info(f"Processing results from {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(driver, row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Processing individual listings from CSV...") options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) for file in aggregate_files: process_results(driver, file, LOCATION, retries=MAX_RETRIES) driver.quit() logger.info(f"Crawl complete.")
DataPipeline
. We add PropertyData
objects to this pipeline, making sure each property gets its own separate report.ThreadPoolExecutor
for running tasks at the same time, just like we did before. We just need to change up a for loop a bit.def process_results(driver, csv_file, location, max_threads=5, retries=3): logger.info(f"Processing results from {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, [driver] * len(reader), reader, [location] * len(reader), [retries] * len(reader) )
process_listing
, which is the function we want to use for each thread that's available.All the other arguments come in as arrays, and then we send those arrays into process_listing
.process_listing()
.Check out the line below; it’s the key to making it all work!scrapeops_proxy_url = get_scrapeops_url(url, location=location)
import os import csv import json import logging import time import concurrent.futures from dataclasses import dataclass, field, fields, asdict from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from webdriver_manager.chrome import ChromeDriverManager from urllib.parse import urlencode API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 3000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" price: int = 0 price_currency: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str): if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class PropertyData: name: str = "" bedrooms: int = 0 bathrooms: float = 0.0 square_feet: int = 0 price_differential: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty, set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() # Scrape search results function def scrape_search_results(search_info, location, page_number, data_pipeline=None, retries=3): formatted_locality = search_info["locality"].replace(" ", "-") url = f"https://www.redfin.com/city/{search_info['id_number']}/{search_info['state']}/{formatted_locality}/page-{page_number+1}" options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36") tries = 0 success = False while tries <= retries and not success: try: # Use the ScrapeOps proxy URL scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) driver.get(scrapeops_proxy_url) logger.info("Waiting for page to load...") # Increase the wait time for the page to load WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "script[type='application/ld+json']"))) # Once we find the script tag, extract its content script_tags = driver.find_elements(By.CSS_SELECTOR, "script[type='application/ld+json']") if not script_tags: raise Exception("No script tags found on the page.") for script in script_tags: json_data = json.loads(script.get_attribute('innerText')) if not isinstance(json_data, list): continue product = {} for element in json_data: if element["@type"] == "Product": product = element break search_data = SearchData( name=product["name"], price=product["offers"]["price"], price_currency=product["offers"]["priceCurrency"], url=product["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") tries += 1 finally: driver.quit() # Ensure the driver is closed after each try if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(search_info, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [search_info] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) # Function to process a single listing using Selenium def process_listing(driver, row, location, retries): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: # Use the ScrapeOps proxy URL scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Processing URL: {url}") # Wait until the page is fully loaded WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']"))) # Extract bedroom information try: bedroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-beds']") bedrooms = int(bedroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0")) except NoSuchElementException: bedrooms = 0 # Extract bathroom information try: bathroom_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-baths']") bathrooms = float(bathroom_holder.find_element(By.CLASS_NAME, "statsValue").text.replace("—", "0")) except NoSuchElementException: bathrooms = 0.0 # Extract square feet information try: size_holder = driver.find_element(By.CSS_SELECTOR, "div[data-rf-test-id='abp-sqFt']") square_feet = int(size_holder.find_element(By.CLASS_NAME, "statsValue").text.replace(",", "")) except NoSuchElementException: square_feet = 0 # Extract price differential information try: difference_holder = driver.find_element(By.CSS_SELECTOR, "span[data-rf-test-name='avmDiffValue']") price_number = int(difference_holder.text.replace(",", "")) color = difference_holder.get_attribute("class") if "diffValue red" in color: price_differential = -price_number else: price_differential = price_number except NoSuchElementException: price_differential = 0 # Create a new DataPipeline instance for each property property_filename = f"{row['name'].replace(' ', '-')}.csv" property_pipeline = DataPipeline(csv_filename=property_filename) # Reset names_seen for the new pipeline instance property_pipeline.names_seen = [] # Create a PropertyData instance property_data = PropertyData( name=row["name"], bedrooms=bedrooms, bathrooms=bathrooms, square_feet=square_feet, price_differential=price_differential ) # Add property data to the pipeline and save to individual CSV property_pipeline.add_data(property_data) property_pipeline.close_pipeline() logger.info(f"Successfully parsed property data: {asdict(property_data)}") success = True except TimeoutException: logger.warning(f"Page load timeout for URL: {url}") tries += 1 except Exception as e: logger.error(f"Exception occurred while processing {url}: {e}") tries += 1 finally: if tries > retries: logger.error(f"Max retries reached for URL: {url}") raise Exception(f"Max retries exceeded for {url}") def process_results(driver, csv_file, location, max_threads=5, retries=3): logger.info(f"Processing results from {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: executor.map( process_listing, [driver] * len(reader), reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": start_time = time.time() MAX_RETRIES = 3 MAX_THREADS = 1 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Processing individual listings from CSV...") options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) for file in aggregate_files: process_results(driver, file, LOCATION, retries=MAX_RETRIES) driver.quit() logger.info(f"Crawl complete.") end_time = time.time() # Record end time execution_time = end_time - start_time logger.info(f"Total execution time: {execution_time:.2f} seconds.")
if __name__ == "__main__": start_time = time.time() MAX_RETRIES = 3 MAX_THREADS = 1 PAGES = 3 LOCATION = "us" logger.info(f"Crawl starting...") location_list = [{"id_number": 12572, "state": "SC", "locality": "Myrtle Beach"}] aggregate_files = [] for search_area in location_list: filename = search_area["locality"].replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(search_area, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Processing individual listings from CSV...") options = Options() options.add_argument("--headless=new") # Use 'new' headless mode for Chrome driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) for file in aggregate_files: process_results(driver, file, LOCATION, retries=MAX_RETRIES) driver.quit() logger.info(f"Crawl complete.") end_time = time.time() # Record end time execution_time = end_time - start_time logger.info(f"Total execution time: {execution_time:.2f} seconds.")
robots.txt
guidelines.You can view Redfin's terms here. Their robots.txt
is available for review here.Violating these rules could lead to account suspension or permanent deletion.In this guide, we only scraped publicly available data.According to the outcomes of many court cases, scraping public data is pretty much completely legal everywhere. If you're scraping private data (data gated behind a login), that's a completely different story.If you're unsure of the legality of your scraper, contact an attorney.