Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your API key and then add this script."pr"
(Puerto Rico), it spits out a file called pr.csv
.It then reads this file and creates an individual report on each house from pr.csv
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" property_type: str = "" street_address: str = "" locality: str = "" region: str = "" postal_code: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass PropertyData: name: str = "" price: int = 0 time_on_zillow: str = "" views: int = 0 saves: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script_tag in script_tags: json_data = json.loads(script_tag.text) if json_data["@type"] != "BreadcrumbList": search_data = SearchData( name=json_data["name"], property_type=json_data["@type"], street_address=json_data["address"]["streetAddress"], locality=json_data["address"]["addressLocality"], region=json_data["address"]["addressRegion"], postal_code=json_data["address"]["postalCode"], url=json_data["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_property(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") price_holder = soup.select_one("span[data-testid='price']") price = int(price_holder.text.replace("$", "").replace(",", "")) info_holders = soup.select("dt") time_listed = info_holders[0].text views = int(info_holders[2].text.replace(",", "")) saves = info_holders[4].text property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") property_data = PropertyData( name=row["name"], price=price, time_on_zillow=time_listed, views=views, saves=saves ) property_pipeline.add_data(property_data) property_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_property, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
:MAX_THREADS
: Determines the maximum number of threads used for concurrent scraping and processing.MAX_RETRIES
: Sets the maximum number of retries for each request in case of failure (e.g., network issues, server errors).PAGES
: Specifies the number of pages to scrape for each keyword. Each page contains multiple property listings.LOCATION
: Defines the geographical location for the scraping. This parameter is used to adjust the proxy location to simulate requests from a specific country.keyword_list
: A list of keywords representing different geographical areas or search terms on Zillow. Each keyword triggers a separate scraping job. ("pr"
is Puerto Ricto, if you want to do Michigan, add "mi"
)https://www.zillow.com/pr/2_p/
pr
is our location.https://www.zillow.com/homedetails/459-Carr-Km-7-2-Int-Bo-Arenales-Aguadilla-PR-00603/363559698_zpid/
https://www.zillow.com/pr/2_p/
2_p
actually denotes our page number, 2.If we want to search for page 1, our URL ishttps://www.zillow.com/pr/1_p/
3_p
.keyword_list
. In the keyword_list
, we'll hold the locations we'd like to scrape.When interacting with the ScrapeOps API, we'll pass in a country
param as well. country
will not have any effect on our actual search results, but instead it will route us through a server in whichever country we specify.For instance, if we want to appear in the US, we'd pass us
in as our country.mkdir zillow-scraper cd zillow-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
scrape_search_results()
function.Take a look at the script so far.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.zillow.com/{keyword}/" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script_tag in script_tags: json_data = json.loads(script_tag.text) if json_data["@type"] != "BreadcrumbList": search_data = { "name": json_data["name"], "property_type": json_data["@type"], "street_address": json_data["address"]["streetAddress"], "locality": json_data["address"]["addressLocality"], "region": json_data["address"]["addressRegion"], "postal_code": json_data["address"]["postalCode"], "url": json_data["url"] } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, retries=retries) logger.info(f"Crawl complete.")
script_tags = soup.select("script[type='application/ld+json']")
"@type"
of "BreadcrumbList"
, we parse its data:
name
property_type
street_address
locality
region
postal_code
url
https://www.zillow.com/{keyword}/{page_number+1}_p/
keyword
is the location we'd like to search.{page_number+1}_p
, denotes our page number. We use page_number+1
because we'll be using Python's range()
function to create our page list.range()
starts counting at zero and Zillow starts our pages at 1. So, we add 1 to our page when we pass it into the URL.start_scrape()
function to support the pagination we just added.def start_scrape(keyword, pages, location, max_threads=5, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
start_scrape()
function to support multiple pages, but all in all, our code isn't all that different.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script_tag in script_tags: json_data = json.loads(script_tag.text) if json_data["@type"] != "BreadcrumbList": search_data = { "name": json_data["name"], "property_type": json_data["@type"], "street_address": json_data["address"]["streetAddress"], "locality": json_data["address"]["addressLocality"], "region": json_data["address"]["addressRegion"], "postal_code": json_data["address"]["postalCode"], "url": json_data["url"] } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, max_threads=5, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
SearchData
class. This class simply holds data.@dataclassclass SearchData: name: str = "" property_type: str = "" street_address: str = "" locality: str = "" region: str = "" postal_code: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. It takes in a dataclass
(such as SearchData
) and pipes it to a CSV file. This pipeline then filters out our duplicates and then saves the data to a CSV file.Additionally, our pipeline writes the file safely. If the CSV exists, we append it, otherwise the pipeline creates it.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" property_type: str = "" street_address: str = "" locality: str = "" region: str = "" postal_code: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script_tag in script_tags: json_data = json.loads(script_tag.text) if json_data["@type"] != "BreadcrumbList": search_data = SearchData( name=json_data["name"], property_type=json_data["@type"], street_address=json_data["address"]["streetAddress"], locality=json_data["address"]["addressLocality"], region=json_data["address"]["addressRegion"], postal_code=json_data["address"]["postalCode"], url=json_data["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
ThreadPoolExecutor
. We'll also add a max_threads
argument to start_scrape()
. Take a look at the snippet below.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map()
:scrape_search_results
is the function we'd like to run on each thread.scrape_search_results()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" property_type: str = "" street_address: str = "" locality: str = "" region: str = "" postal_code: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") for script_tag in script_tags: json_data = json.loads(script_tag.text) if json_data["@type"] != "BreadcrumbList": search_data = SearchData( name=json_data["name"], property_type=json_data["@type"], street_address=json_data["address"]["streetAddress"], locality=json_data["address"]["addressLocality"], region=json_data["address"]["addressRegion"], postal_code=json_data["address"]["postalCode"], url=json_data["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
"api_key"
: our ScrapeOps API key."url"
: the url of the site we'd like to scrape."country"
: the country we'd like to be routed through."residential"
: a boolean value. If we set this to True
, we're telling ScrapeOps to give us a residential IP address which decreases our likelihood of getting blocked.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" property_type: str = "" street_address: str = "" locality: str = "" region: str = "" postal_code: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") excluded_types = ["BreadcrumbList", "Event"] for script_tag in script_tags: json_data = json.loads(script_tag.text) if json_data["@type"] not in excluded_types: search_data = SearchData( name=json_data["name"], property_type=json_data["@type"], street_address=json_data["address"]["streetAddress"], locality=json_data["address"]["addressLocality"], region=json_data["address"]["addressRegion"], postal_code=json_data["address"]["postalCode"], url=json_data["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
PAGES
to 5, and LOCATION
to "us"
. Feel free to change any of these constants in the main
to tweak your results.Here are our results.def process_property(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") price_holder = soup.select_one("span[data-testid='price']") price = int(price_holder.text.replace("$", "").replace(",", "")) info_holders = soup.select("dt") time_listed = info_holders[0].text views = int(info_holders[2].text.replace(",", "")) saves = info_holders[4].text property_data = { "name": row["name"], "price": price, "time_on_zillow": time_listed, "views": views, "saves": saves } print(property_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
"span[data-testid='price']"
is the CSS selector of our price_holder
.int(price_holder.text.replace("$", "").replace(",", ""))
gives us our actual price and converts it to an integer.info_holders
with soup.select("dt")
time_listed
, views
, and saves
from the info_holders
array.for
each row in the file, we run process_property()
on that row. Later, we'll add concurrency to this function just like we did eariler.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_property(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" property_type: str = "" street_address: str = "" locality: str = "" region: str = "" postal_code: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") excluded_types = ["BreadcrumbList", "Event"] for script_tag in script_tags: json_data = json.loads(script_tag.text) if json_data["@type"] not in excluded_types: search_data = SearchData( name=json_data["name"], property_type=json_data["@type"], street_address=json_data["address"]["streetAddress"], locality=json_data["address"]["addressLocality"], region=json_data["address"]["addressRegion"], postal_code=json_data["address"]["postalCode"], url=json_data["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_property(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") price_holder = soup.select_one("span[data-testid='price']") price = int(price_holder.text.replace("$", "").replace(",", "")) info_holders = soup.select("dt") time_listed = info_holders[0].text views = int(info_holders[2].text.replace(",", "")) saves = info_holders[4].text property_data = { "name": row["name"], "price": price, "time_on_zillow": time_listed, "views": views, "saves": saves } print(property_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_property(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
process_post()
on each row from the file.PropertyData
class. This class will act much like the SearchData
class from before and it will also get passed into a DataPipeline
.Here is our SearchData
class.@dataclassclass PropertyData: name: str = "" price: int = 0 time_on_zillow: str = "" views: int = 0 saves: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
PropertyData
object and then pass it into a DataPipeline
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" property_type: str = "" street_address: str = "" locality: str = "" region: str = "" postal_code: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass PropertyData: name: str = "" price: int = 0 time_on_zillow: str = "" views: int = 0 saves: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") excluded_types = ["BreadcrumbList", "Event"] for script_tag in script_tags: json_data = json.loads(script_tag.text) if json_data["@type"] not in excluded_types: search_data = SearchData( name=json_data["name"], property_type=json_data["@type"], street_address=json_data["address"]["streetAddress"], locality=json_data["address"]["addressLocality"], region=json_data["address"]["addressRegion"], postal_code=json_data["address"]["postalCode"], url=json_data["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_property(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") price_holder = soup.select_one("span[data-testid='price']") price = int(price_holder.text.replace("$", "").replace(",", "")) info_holders = soup.select("dt") time_listed = info_holders[0].text views = int(info_holders[2].text.replace(",", "")) saves = info_holders[4].text property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") property_data = PropertyData( name=row["name"], price=price, time_on_zillow=time_listed, views=views, saves=saves ) property_pipeline.add_data(property_data) property_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_property(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
for
loop with ThreadPoolExecutor
. Take a look at the new function.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_property, reader, [location] * len(reader), [retries] * len(reader) )
executor.map()
takes the following arguments.process_property
is the function we want to run on each thread.reader
is the array of property from our CSV file.response = requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" property_type: str = "" street_address: str = "" locality: str = "" region: str = "" postal_code: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass PropertyData: name: str = "" price: int = 0 time_on_zillow: str = "" views: int = 0 saves: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tags = soup.select("script[type='application/ld+json']") excluded_types = ["BreadcrumbList", "Events"] for script_tag in script_tags: json_data = json.loads(script_tag.text) if json_data["@type"] not in excluded_types: search_data = SearchData( name=json_data["name"], property_type=json_data["@type"], street_address=json_data["address"]["streetAddress"], locality=json_data["address"]["addressLocality"], region=json_data["address"]["addressRegion"], postal_code=json_data["address"]["postalCode"], url=json_data["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_property(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") price_holder = soup.select_one("span[data-testid='price']") price = int(price_holder.text.replace("$", "").replace(",", "")) info_holders = soup.select("dt") time_listed = info_holders[0].text views = int(info_holders[2].text.replace(",", "")) saves = info_holders[4].text property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") property_data = PropertyData( name=row["name"], price=price, time_on_zillow=time_listed, views=views, saves=saves ) property_pipeline.add_data(property_data) property_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_property, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
PAGES
to 5 and our LOCATION
to "us"
.Feel free to change any of the constants within main
to tweak your results.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
which you can view here. It's important to note that violation of these Terms could result in your account getting blocked or even permanently removed from the site.When scraping, public data is generally considered legal throughout the world. Private data is any data that is gated behind a login or some other form of authentication.If you're not sure your scraper is legal, it's best to consult with an attorney who handles the jurisdiction of the site you're scraping.Then check out ScrapeOps, the complete toolkit for web scraping.
.env
file and add your API key in this format:SCRAPEOPS_API_KEY=your_api_key_here
main.py
and insert this code into it:import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, fields, asdictimport timefrom dotenv import load_dotenvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementException load_dotenv() API_KEY = os.getenv("SCRAPEOPS_API_KEY") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" property_type: str = "" street_address: str = "" locality: str = "" region: str = "" postal_code: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass PropertyData: name: str = "" price: int = 0 time_on_zillow: str = "" views: int = 0 saves: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3, timeout=10): url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/" scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') # Run in headless mode for attempt in range(retries): try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) # Wait for the body to ensure page has started loading WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Wait for and find script elements script_elements = WebDriverWait(driver, timeout).until( EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']")) ) for script in script_elements: json_data = json.loads(script.get_attribute('innerHTML')) if json_data["@type"] != "BreadcrumbList": search_data = SearchData( name=json_data["name"], property_type=json_data["@type"], street_address=json_data["address"]["streetAddress"], locality=json_data["address"]["addressLocality"], region=json_data["address"]["addressRegion"], postal_code=json_data["address"]["postalCode"], url=json_data["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") return # Success, exit the function except (TimeoutException, WebDriverException) as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}") raise Exception(f"Max retries ({retries}) exceeded for URL: {url}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_property(row, location, retries=3, timeout=10): url = row["url"] scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') for attempt in range(retries): try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) # Wait for the body to ensure page has started loading WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Extract price price_element = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']")) ) price = int(price_element.text.replace("$", "").replace(",", "")) # Extract other information info_elements = driver.find_elements(By.TAG_NAME, "dt") time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed" views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0 saves = info_elements[4].text if len(info_elements) > 4 else "No saves" property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") property_data = PropertyData( name=row["name"], price=price, time_on_zillow=time_listed, views=views, saves=saves ) property_pipeline.add_data(property_data) property_pipeline.close_pipeline() logger.info(f"Successfully parsed: {url}") return # Success, exit the function except (TimeoutException, WebDriverException, NoSuchElementException) as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}") raise Exception(f"Max retries ({retries}) exceeded for URL: {url}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_property, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
pr.csv
.MAX_THREADS
: Determines the maximum number of threads used for concurrent scraping and processing.MAX_RETRIES
: Sets the maximum number of retries for each request in case of failure (e.g., network issues, server errors).PAGES
: Specifies the number of pages to scrape for each keyword. Each page contains multiple property listings.LOCATION
: Defines the geographical location for the scraping. This parameter is used to adjust the proxy location to simulate requests from a specific country.keyword_list
: A list of keywords representing different geographical areas or search terms on Zillow. Each keyword triggers a separate scraping job. ("pr"
is Puerto Ricto, if you want to do Michigan, add "mi"
)pr.csv
.requests
) can fetch a page’s HTML, Zillow's anti-bot protections make that approach unreliable. Instead, we'll use Selenium to simulate a human browsing experience.In the code, the following URL structure represents a search result page for Puerto Rico, with pagination handled by the number at the end:https://www.zillow.com/pr/2_p/
pr
refers to the location (Puerto Rico),2_p
specifies that we are on the second page of results.https://www.zillow.com/homedetails/459-Carr-Km-7-2-Int-Bo-Arenales-Aguadilla-PR-00603/363559698_zpid/
script
elements of type application/ld+json
, which contain the data we need.Here’s how we would approach this in our scraper:# Extract JSON data from search resultsscript_elements = WebDriverWait(driver, timeout).until( EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']"))) for script in script_elements: json_data = json.loads(script.get_attribute('innerHTML')) if json_data["@type"] != "BreadcrumbList": # Extract relevant fields from JSON search_data = SearchData( name=json_data["name"], property_type=json_data["@type"], street_address=json_data["address"]["streetAddress"], locality=json_data["address"]["addressLocality"], region=json_data["address"]["addressRegion"], postal_code=json_data["address"]["postalCode"], url=json_data["url"] ) data_pipeline.add_data(search_data)
# Wait for the price element and extract its valueprice_element = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']")))price = int(price_element.text.replace("$", "").replace(",", "")) # Extract other details such as time listed, views, and savesinfo_elements = driver.find_elements(By.TAG_NAME, "dt")time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed"views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0saves = info_elements[4].text if len(info_elements) > 4 else "No saves"
https://www.zillow.com/pr/1_p/
https://www.zillow.com/pr/2_p/
https://www.zillow.com/pr/3_p/
scrape_search_results
function handles pagination by iterating over pages:def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3, timeout=10): url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/" # The rest of the code follows to scrape data from this page
country
param as well. country
will not have any effect on our actual search results, but instead it will route us through a server in whichever country we specify.For instance, if we want to appear in the US, we'd pass us
in as our country.This helps bypass Zillow’s geolocation blocks and improves the chances of successful scraping.The get_scrapeops_url()
function integrates this proxy service:def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
location
parameter determines which country we appear to be browsing from (e.g., us
for the US).This doesn’t change the search results but helps us avoid being blocked by Zillow’s anti-scraping mechanisms.mkdir <your_directory_name>cd <your_directory_name>
python -m venv venv
source venv/bin/activate # Linux# ORvenv\Scripts\activate # Windows
pip install selenium python-dotenv
selenium
to automate web browser interactions.We'll use python-dotenv
to securely manage sensitive information like login credentials or API keys by storing them in a separate .env
file, which helps keep our main code clean and our secrets safe from accidental exposure.scrape_search_results()
. This function handles the search result page scraping, extracting necessary details (like property URLs, addresses, and prices) and storing them in a CSV file.Here’s an outline:def scrape_search_results(keyword, location, retries=3, timeout=10): url = f"https://www.zillow.com/{keyword}/" scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') for attempt in range(retries): try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) # Wait for the body to ensure page has started loading WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Wait for and find script elements script_elements = WebDriverWait(driver, timeout).until( EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']")) ) for script in script_elements: json_data = json.loads(script.get_attribute('innerHTML')) if json_data["@type"] != "BreadcrumbList": search_data = { "name": json_data["name"], "property_type": json_data["@type"], "street_address": json_data["address"]["streetAddress"], "locality": json_data["address"]["addressLocality"], "region": json_data["address"]["addressRegion"], "postal_code": json_data["address"]["postalCode"], "url": json_data["url"] } print(search_data) logger.info(f"Successfully parsed data from: {url}") return # Success, exit the function except (TimeoutException, WebDriverException) as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}") raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")
keyword
) and page_number
. It sends a GET request to fetch the page, extracts the data, and prints the data.We've also included a retry mechanism to handle potential errors during scraping._p/
) to navigate between search results.As discussed earlier, we need to increment the page number in our URL.Here’s how:def start_scrape(keyword, pages, location, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, retries=retries)
range()
: We use Python’s range()
to handle multiple pages of results. Since Zillow pages start at 1 and range()
starts at 0, we add +1
to the page number in the URL.SearchData
class to structure the extracted information, like property address, price, and more.This ensures that we store data consistently in our CSV file.@dataclassclass SearchData: name: str = "" property_type: str = "" street_address: str = "" locality: str = "" region: str = "" postal_code: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
@ dataclass
decorator for automatic method generation.DataPipeline
class that handles saving data to a csv file:class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
DataPipeline
class to handle data efficiently and store it in CSV format:save_to_csv
method writes data to a CSV file. It creates new files or appends to existing ones as needed.DictWriter
for flexible field handling, writing headers for new files.csv_file_open
to avoid concurrent CSV writes.close_pipeline
method saves any leftover data before shutdown.ThreadPoolExecutor
.This allows us to scrape several pages simultaneously, reducing overall runtime.from concurrent.futures import ThreadPoolExecutor def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
ThreadPoolExecutor
to run multiple scraping tasks simultaneouslyscrape_search_results
function across multiple threadsmax_threads
parameterget_scrapeops_url()
function. This function generates a proxy URL to route our requests through servers in different regions, making it more difficult for Zillow to block our scrapers.import osfrom dotenv import load_dotenvfrom urllib.parse import urlencode load_dotenv()API_KEY = os.getenv("SCRAPEOPS_API_KEY") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url print(get_scrapeops_url('https://zillow.com'))
url
and a location
parameter (e.g., us
), we ensure our requests are routed through a residential proxy, minimizing the risk of getting blocked by Zillow’s anti-bot system.main
method to initiate the scraping process.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
DataPipeline
for each keywordstart_scrape
to begin the concurrent scraping processimport osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, fields, asdictimport timefrom dotenv import load_dotenvfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.common.exceptions import TimeoutException, WebDriverException load_dotenv() API_KEY = os.getenv("SCRAPEOPS_API_KEY") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" property_type: str = "" street_address: str = "" locality: str = "" region: str = "" postal_code: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3, timeout=10): url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/" scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') # Run in headless mode for attempt in range(retries): try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) # Wait for the body to ensure page has started loading WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Wait for and find script elements script_elements = WebDriverWait(driver, timeout).until( EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']")) ) for script in script_elements: json_data = json.loads(script.get_attribute('innerHTML')) if json_data["@type"] != "BreadcrumbList": search_data = SearchData( name=json_data["name"], property_type=json_data["@type"], street_address=json_data["address"]["streetAddress"], locality=json_data["address"]["addressLocality"], region=json_data["address"]["addressRegion"], postal_code=json_data["address"]["postalCode"], url=json_data["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") return # Success, exit the function except (TimeoutException, WebDriverException) as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}") raise Exception(f"Max retries ({retries}) exceeded for URL: {url}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
get_scrapeops_url
to use ScrapeOps proxy, allowing us to bypass anti-bot measures.scrape_search_results
to extract property data from Zillow pages using Selenium.start_scrape
to handle multiple pages of search results (initially without concurrency).SearchData
class to structure our scraped information and DataPipeline
class to manage data storage and CSV writing.start_scrape
function with ThreadPoolExecutor
to scrape multiple pages simultaneously, improving efficiency.get_scrapeops_url()
function:scrapeops_proxy_url = get_scrapeops_url(url, location=location)
def process_property(row, location, retries=3, timeout=10): url = row["url"] scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') for attempt in range(retries): try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) # Wait for the body to ensure page has started loading WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Extract price price_element = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']")) ) price = int(price_element.text.replace("$", "").replace(",", "")) # Extract other information info_elements = driver.find_elements(By.TAG_NAME, "dt") time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed" views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0 saves = info_elements[4].text if len(info_elements) > 4 else "No saves" property_data = { 'name': row["name"], 'price': price, 'time_on_zillow': time_listed, 'views': views, 'saves': saves } print(property_data) logger.info(f"Successfully parsed: {url}") return # Success, exit the function except (TimeoutException, WebDriverException, NoSuchElementException) as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}") raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")
"span[data-testid='price']"
is the CSS selector for the price.int(price_holder.text.replace("$", "").replace(",", ""))
cleans up and converts the price into an integer.time_listed
, views
, and saves
from the info_holders
list using the correct CSS selectors.process_property()
. Later, we’ll add concurrency to speed things up.def process_results(csv_file): with open(csv_file, newline='') as file: reader = csv.DictReader(file) for row in reader: process_property(row['url'])
process_property()
function.PropertyData
class.This class will be similar to the SearchData
class we used earlier but specific to the details scraped from individual property pages.@dataclassclass PropertyData: name: str = "" price: int = 0 time_on_zillow: str = "" views: int = 0 saves: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
we created in the crawler section as follows:class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() @dataclassclass PropertyData: name: str = "" price: int = 0 time_on_zillow: str = "" views: int = 0 saves: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) def process_property(row, location, retries=3, timeout=10): url = row["url"] scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') for attempt in range(retries): try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) # Wait for the body to ensure page has started loading WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Extract price price_element = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']")) ) price = int(price_element.text.replace("$", "").replace(",", "")) # Extract other information info_elements = driver.find_elements(By.TAG_NAME, "dt") time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed" views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0 saves = info_elements[4].text if len(info_elements) > 4 else "No saves" property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") property_data = PropertyData( name=row["name"], price=price, time_on_zillow=time_listed, views=views, saves=saves ) property_pipeline.add_data(property_data) property_pipeline.close_pipeline() logger.info(f"Successfully parsed: {url}") return # Success, exit the function except (TimeoutException, WebDriverException, NoSuchElementException) as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}") raise Exception(f"Max retries ({retries}) exceeded for URL: {url}")
ThreadPoolExecutor
. This helps us speed up the process by running multiple process_property()
functions concurrently.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_property, reader, [location] * len(reader), [retries] * len(reader) )
executor.map()
handles the parallel processing of multiple property URLs. The process_property()
function is called on each URL, and the results are saved concurrently.PAGES
to 5 and our LOCATION
to "uk"
. Feel free to change any of the constants within main
to tweak your results.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
from dotenv import load_dotenvimport osfrom urllib.parse import urlencodefrom selenium import webdriverfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.webdriver.common.by import Byfrom selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementExceptionfrom dataclasses import fields, asdict, dataclassimport csv import loggingimport timeimport concurrent.futuresimport json ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # Load environment variablesload_dotenv()API_KEY = os.getenv("SCRAPEOPS_API_KEY") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url @dataclassclass SearchData: name: str = "" property_type: str = "" street_address: str = "" locality: str = "" region: str = "" postal_code: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3, timeout=10): url = f"https://www.zillow.com/{keyword}/{page_number+1}_p/" scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') # Run in headless mode for attempt in range(retries): try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) # Wait for the body to ensure page has started loading WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Wait for and find script elements script_elements = WebDriverWait(driver, timeout).until( EC.presence_of_all_elements_located((By.XPATH, "//script[@type='application/ld+json']")) ) for script in script_elements: json_data = json.loads(script.get_attribute('innerHTML')) if json_data["@type"] != "BreadcrumbList": search_data = SearchData( name=json_data["name"], property_type=json_data["@type"], street_address=json_data["address"]["streetAddress"], locality=json_data["address"]["addressLocality"], region=json_data["address"]["addressRegion"], postal_code=json_data["address"]["postalCode"], url=json_data["url"] ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") return # Success, exit the function except (TimeoutException, WebDriverException) as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}") raise Exception(f"Max retries ({retries}) exceeded for URL: {url}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) @dataclassclass PropertyData: name: str = "" price: int = 0 time_on_zillow: str = "" views: int = 0 saves: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) def process_property(row, location, retries=3, timeout=10): url = row["url"] scrapeops_proxy_url = get_scrapeops_url(url, location=location) options = webdriver.ChromeOptions() options.add_argument('--headless') for attempt in range(retries): try: with webdriver.Chrome(options=options) as driver: driver.get(scrapeops_proxy_url) # Wait for the body to ensure page has started loading WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Extract price price_element = WebDriverWait(driver, timeout).until( EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='price']")) ) price = int(price_element.text.replace("$", "").replace(",", "")) # Extract other information info_elements = driver.find_elements(By.TAG_NAME, "dt") time_listed = info_elements[0].text if len(info_elements) > 0 else "No time listed" views = int(info_elements[2].text.replace(",", "")) if len(info_elements) > 2 else 0 saves = info_elements[4].text if len(info_elements) > 4 else "No saves" property_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") property_data = PropertyData( name=row["name"], price=price, time_on_zillow=time_listed, views=views, saves=saves ) property_pipeline.add_data(property_data) property_pipeline.close_pipeline() logger.info(f"Successfully parsed: {url}") return # Success, exit the function except (TimeoutException, WebDriverException, NoSuchElementException) as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, attempts left: {retries-attempt-1}") raise Exception(f"Max retries ({retries}) exceeded for URL: {url}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_property, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["pr"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") logger.info(f"Scrape starting...") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info(f"Scrape complete.")
python <your_script_name>.py
pr.csv
. It then reads this file and creates an individual report on each house.robots.txt
file here, which outlines rules for automated access. Not following these guidelines could lead to account suspension or a permanent ban.Generally, scraping publicly available data is legal in many regions, but accessing private data—such as that which requires login or authentication—requires permission.If you’re unsure about the legal aspects of your scraping activities, it’s advisable to seek legal advice from an attorney who is familiar with the laws in your area.