Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your ScrapeOps Proxy API keys.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") pagination_bar = soup.select_one("nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_all("a") links = [] links.append(url) acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: href = a.get("href") link = f"https://www.airbnb.com{href}" links.append(link) success = True return links except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 if not success: raise Exception("Failed to find pagination, max retries exceeded!") def scrape_search_results(url, location, data_pipeline=None, retries=3): tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='card-container']") for div_card in div_cards: descripition = div_card.select_one("div[data-testid='listing-card-title']").text subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']") name = subtitle_array[0].text dates = subtitle_array[-1].text price = div_card.select_one("span div span").text href = div_card.find("a").get("href") link = f"https://www.airbnb.com{href}" search_data = SearchData( name=name, description=descripition, dates=dates, price=price, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries +=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.select("div[role='listitem']") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '-')}.csv") for review_card in review_cards: name = review_card.find("h3").text stars = len(review_card.find_all("svg")) spans = review_card.find_all("span") review = spans[-1].text review_data = ReviewData( name=name, stars=stars, review=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
:MAX_RETRIES
: Sets the maximum number of retry attempts the script will make if a request fails.MAX_THREADS
: Sets the maximum number of threads (or concurrent tasks) that the script will use when scraping data.PAGES
: Determines how many pages of search results the scraper will attempt to process.LOCATION
: Specifies the country code for the location from which you want to simulate the scraping requests.keyword_list
: A list of keywords or phrases that the script will use to search for listings on the website.https://www.airbnb.com/s/Myrtle-Beach--South-Carolina--United-States/homes
https://www.airbnb.com/s/{NAME-OF-SEARCH-LOCATION}/homes
https://www.airbnb.com/rooms/34653621?adults=1&children=0&enable_m3_private_room=true&infants=0&pets=0&search_mode=regular_search&check_in=2024-09-02&check_out=2024-09-07&source_impression_id=p3_1723223538_P3jJDPiXFbNNUsdP&previous_page_section_name=1000&federated_search_id=532193a1-1995-4edd-824a-5987dfa778f1
div
cards with a data-testid
of card-container
."div[data-testid='card-container']"
.div
elements with the role
of listitem
.Here is the CSS selector we would use: "div[role='listitem']"
.Go ahead and look at it in the image below. From this div
, we'll be able to pull all of our relevant review data.https://www.airbnb.com/s/Myrtle-Beach--South-Carolina--United-States/homes?tab_id=home_tab&refinement_paths%5B%5D=%2Fhomes&query=Myrtle%20Beach%2C%20South%20Carolina%2C%20United%20States&place_id=ChIJASFVO5VoAIkRGJbQtRWxD7w&flexible_trip_lengths%5B%5D=one_week&monthly_start_date=2024-09-01&monthly_length=3&monthly_end_date=2024-12-01&search_mode=regular_search&price_filter_input_type=0&channel=EXPLORE&federated_search_session_id=dcc6f5af-f1c5-4463-8c02-7e4dcf38a02d&search_type=unknown&pagination_search=true&cursor=eyJzZWN0aW9uX29mZnNldCI6MCwiaXRlbXNfb2Zmc2V0IjoxOCwidmVyc2lvbiI6MX0%3D
country
parameter.When we pass a country
into ScrapeOps, they will route us through a server in that country."country": "us"
tells ScrapeOps that we want to appear in the US."country": "uk"
. This gives us an actual IP address from within the country of our choosing.mkdir airbnb-scraper cd airbnb-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(url, location, retries=3): tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='card-container']") for div_card in div_cards: descripition = div_card.select_one("div[data-testid='listing-card-title']").text subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']") name = subtitle_array[0].text dates = subtitle_array[-1].text price = div_card.select_one("span div span").text href = div_card.find("a").get("href") link = f"https://www.airbnb.com{href}" search_data = { "name": name, "description": descripition, "dates": dates, "price": price, "url": link } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries +=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" scrape_search_results(url, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
div_card.select_one("div[data-testid='listing-card-title']").text
gets our title.div_card.select("div[data-testid='listing-card-subtitle']")
.div_card.find("a").get("href")
finds the link to the listing page.link = f"https://www.airbnb.com{href}"
"nav[aria-label='Search results pagination']"
.Here is find_pagination_urls()
.def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") pagination_bar = soup.select_one("nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_all("a") links = [] links.append(url) acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: href = a.get("href") link = f"https://www.airbnb.com{href}" links.append(link) success = True return links except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 if not success: raise Exception("Failed to find pagination, max retries exceeded!")
soup.select_one("nav[aria-label='Search results pagination']")
finds our bar of pagination links.pagination_bar.find_all("a")
.links
array.["1", "2", "3", "4"]
.return
it. We'll pass this array into our start_scrape()
function.start_scrape()
function to take in a list of urls and call scrape_search_results()
. It's very simple. It just takes in a url_list
and uses a for
loop to call scrape_search_results()
on each url.def start_scrape(url_list, location, retries=3): for url in url_list: scrape_search_results(url, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") pagination_bar = soup.select_one("nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_all("a") links = [] links.append(url) acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: href = a.get("href") link = f"https://www.airbnb.com{href}" links.append(link) success = True return links except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 if not success: raise Exception("Failed to find pagination, max retries exceeded!") def scrape_search_results(url, location, retries=3): tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='card-container']") for div_card in div_cards: descripition = div_card.select_one("div[data-testid='listing-card-title']").text subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']") name = subtitle_array[0].text dates = subtitle_array[-1].text price = div_card.select_one("span div span").text href = div_card.find("a").get("href") link = f"https://www.airbnb.com{href}" search_data = { "name": name, "description": descripition, "dates": dates, "price": price, "url": link } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries +=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(url_list, location, retries=3): for url in url_list: scrape_search_results(url, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) start_scrape(page_urls, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
start_scrape()
to run scrape_search_results()
on each and every url generated from the list.dataclass
and a DataPipeline
. We'll call our dataclass
SearchData
. This SearchData
gets passed into the DataPipeline
which pipes our data to a CSV file and removes duplicate results.Here is our SearchData
.@dataclassclass SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
below.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") pagination_bar = soup.select_one("nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_all("a") links = [] links.append(url) acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: href = a.get("href") link = f"https://www.airbnb.com{href}" links.append(link) success = True return links except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 if not success: raise Exception("Failed to find pagination, max retries exceeded!") def scrape_search_results(url, location, data_pipeline=None, retries=3): tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='card-container']") for div_card in div_cards: descripition = div_card.select_one("div[data-testid='listing-card-title']").text subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']") name = subtitle_array[0].text dates = subtitle_array[-1].text price = div_card.select_one("span div span").text href = div_card.find("a").get("href") link = f"https://www.airbnb.com{href}" search_data = SearchData( name=name, description=descripition, dates=dates, price=price, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries +=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(url_list, location, data_pipeline=None, retries=3): for url in url_list: scrape_search_results(url, location, data_pipeline=data, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
DataPipeline
before starting our scrape.DataPipeline
into start_scrape()
which in turn passes it into scrape_search_results()
.SearchData
object and pass it into the pipeline.crawl_pipeline.close_pipeline()
.ThreadPoolExecutor
.ThreadPoolExecutor
opens up a new pool of threads up to max_threads
. On each of these open threads, it calls a function and passes arguments to it. This approach is exponentially faster than a simple for
loop.Here is our new start_scrape()
.def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) )
executor.map()
, you'll notice the following:scrape_search_results
is the function we want to call on available threads.url_list
is the list we want to run the function on.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") pagination_bar = soup.select_one("nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_all("a") links = [] links.append(url) acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: href = a.get("href") link = f"https://www.airbnb.com{href}" links.append(link) success = True return links except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 if not success: raise Exception("Failed to find pagination, max retries exceeded!") def scrape_search_results(url, location, data_pipeline=None, retries=3): tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='card-container']") for div_card in div_cards: descripition = div_card.select_one("div[data-testid='listing-card-title']").text subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']") name = subtitle_array[0].text dates = subtitle_array[-1].text price = div_card.select_one("span div span").text href = div_card.find("a").get("href") link = f"https://www.airbnb.com{href}" search_data = SearchData( name=name, description=descripition, dates=dates, price=price, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries +=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
get_scrapeops_url()
.You can view it below.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
"api_key"
is our ScrapeOps API key."url"
is the url we want to scrape."country"
holds the country we want to be routed through."wait"
tells ScrapeOps to wait a certain amount of time before sending back our result. This allows content to load on the page.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") pagination_bar = soup.select_one("nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_all("a") links = [] links.append(url) acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: href = a.get("href") link = f"https://www.airbnb.com{href}" links.append(link) success = True return links except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 if not success: raise Exception("Failed to find pagination, max retries exceeded!") def scrape_search_results(url, location, data_pipeline=None, retries=3): tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='card-container']") for div_card in div_cards: descripition = div_card.select_one("div[data-testid='listing-card-title']").text subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']") name = subtitle_array[0].text dates = subtitle_array[-1].text price = div_card.select_one("span div span").text href = div_card.find("a").get("href") link = f"https://www.airbnb.com{href}" search_data = SearchData( name=name, description=descripition, dates=dates, price=price, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries +=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
. Feel free to change MAX_THREADS
, MAX_RETRIES
, PAGES
, LOCATION
or keyword_list
if you'd like to adjust your results. We're going to set PAGES
to 4. That gives the max amount of pages from our pagination scraper.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
wait
set to 5 seconds, so we spent at least 5 seconds waiting for the pagination scrape. 24.595 - 5 = 19.595 seconds spent actually crawling. 19.595 seconds / 4 pages = 4.89 seconds per page.soup.select("div[role='listitem']")
.Once we have these cards, we iterate through them. On each card, we pull the name
, stars
, and review
. These objects are the data we want to store for later review.def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.select("div[role='listitem']") for review_card in review_cards: name = review_card.find("h3").text stars = len(review_card.find_all("svg")) spans = review_card.find_all("span") review = spans[-1].text review_data = { "name": name, "stars": stars, "review": review } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
review_cards = soup.select("div[role='listitem']")
finds our list of review cards.name
stars
reviews
start_scrape()
. The main difference is that this one will first read the CSV file before calling the parsing function.Here is process_results()
. First, we open and read our CSV file into an array, reader
. After we've got our array, we iterate through it and call process_listing()
.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") pagination_bar = soup.select_one("nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_all("a") links = [] links.append(url) acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: href = a.get("href") link = f"https://www.airbnb.com{href}" links.append(link) success = True return links except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 if not success: raise Exception("Failed to find pagination, max retries exceeded!") def scrape_search_results(url, location, data_pipeline=None, retries=3): tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='card-container']") for div_card in div_cards: descripition = div_card.select_one("div[data-testid='listing-card-title']").text subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']") name = subtitle_array[0].text dates = subtitle_array[-1].text price = div_card.select_one("span div span").text href = div_card.find("a").get("href") link = f"https://www.airbnb.com{href}" search_data = SearchData( name=name, description=descripition, dates=dates, price=price, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries +=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.select("div[role='listitem']") for review_card in review_cards: name = review_card.find("h3").text stars = len(review_card.find_all("svg")) spans = review_card.find_all("span") review = spans[-1].text review_data = { "name": name, "stars": stars, "review": review } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
process_listing()
on each row from the CSV file.DataPipeline
, we just need to feed it a new dataclass
. This one will represent the review objects we've been parsing in the examples above. We'll call our new dataclass
, ReviewData
.Here is our new ReviewData
class.@dataclassclass ReviewData: name: str = "" stars: int = 0 review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") pagination_bar = soup.select_one("nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_all("a") links = [] links.append(url) acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: href = a.get("href") link = f"https://www.airbnb.com{href}" links.append(link) success = True return links except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 if not success: raise Exception("Failed to find pagination, max retries exceeded!") def scrape_search_results(url, location, data_pipeline=None, retries=3): tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='card-container']") for div_card in div_cards: descripition = div_card.select_one("div[data-testid='listing-card-title']").text subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']") name = subtitle_array[0].text dates = subtitle_array[-1].text price = div_card.select_one("span div span").text href = div_card.find("a").get("href") link = f"https://www.airbnb.com{href}" search_data = SearchData( name=name, description=descripition, dates=dates, price=price, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries +=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.select("div[role='listitem']") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review_card in review_cards: name = review_card.find("h3").text stars = len(review_card.find_all("svg")) spans = review_card.find_all("span") review = spans[-1].text review_data = ReviewData( name=name, stars=stars, review=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline
from inside process_listing()
ReviewData
objects into the pipeline.process_results()
exactly the same way we refactored start_scrape()
.We'll use ThreadPoolExecutor
to accomplish this again. Our first argument, process_listing
is the function we want to call on our available threads. reader
is our array of listings. All other arguments get passed in as arrays, just like before.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, reader, [location] * len(reader), [retries] * len(reader) )
response = requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" stars: int = 0 review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") pagination_bar = soup.select_one("nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_all("a") links = [] links.append(url) acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: href = a.get("href") link = f"https://www.airbnb.com{href}" links.append(link) success = True return links except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 if not success: raise Exception("Failed to find pagination, max retries exceeded!") def scrape_search_results(url, location, data_pipeline=None, retries=3): tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[data-testid='card-container']") for div_card in div_cards: descripition = div_card.select_one("div[data-testid='listing-card-title']").text subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']") name = subtitle_array[0].text dates = subtitle_array[-1].text price = div_card.select_one("span div span").text href = div_card.find("a").get("href") link = f"https://www.airbnb.com{href}" search_data = SearchData( name=name, description=descripition, dates=dates, price=price, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries +=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_cards = soup.select("div[role='listitem']") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '-')}.csv") for review_card in review_cards: name = review_card.find("h3").text stars = len(review_card.find_all("svg")) spans = review_card.find_all("span") review = spans[-1].text review_data = ReviewData( name=name, stars=stars, review=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
, MAX_THREADS
, PAGES
, LOCATION
, and keyword_list
. You can view our updated main
below. Once again, we have pages set to 4.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 3 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
.You can view Airbnb's terms here. Their robots.txt
is available for review here.Public data is typically legal to scrape. Private data (data gated behind a login) is a completely different story. When scraping private data, you are subject to not only the site's terms but also the privacy laws that govern that site.Always consult an attorney when you have questions about the legality of your scraper.Then check out ScrapeOps, the complete toolkit for web scraping.
import os import re import csv import json import logging import time from urllib.parse import urlencode from dataclasses import dataclass, fields, asdict from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class ReviewData: name: str = "" stars: int = 0 review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = list(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries, success = 0, False links = [url] while tries < retries and not success: try: chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--disable-extensions") chrome_options.add_argument("--disable-blink-features=AutomationControlled") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(url) WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']"))) pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_elements(By.TAG_NAME, "a") acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: link = a.get_attribute("href") if link: links.append(link) success = True driver.quit() except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 driver.quit() if not success: raise Exception("Failed to find pagination, max retries exceeded!") return links def scrape_search_results(url, location, data_pipeline=None, retries=3): tries = 0 success = False scrapeops_proxy_url = get_scrapeops_url(url, location=location) while tries <= retries and not success: try: # Initialize WebDriver inside the function chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(scrapeops_proxy_url) logger.info(f"Loaded page: {url}") # Wait for listings to load WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']")) ) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']") for div_card in div_cards: description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']") name = subtitle_elements[0].text if len(subtitle_elements) > 0 else "No Name" dates = subtitle_elements[-1].text if len(subtitle_elements) > 1 else "No Dates" price = div_card.find_element(By.CSS_SELECTOR, "span div span").text if div_card.find_elements(By.CSS_SELECTOR, "span div span") else "No Price" href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href") # Remove the proxy URL part and construct the original Airbnb URL original_url = href.replace("https://proxy.scrapeops.io/", "https://www.airbnb.com/") search_data = SearchData( name=name, description=description, dates=dates, price=price, url=original_url # Use the cleaned URL ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}") tries += 1 finally: driver.quit() # Ensures driver is closed on each attempt if not success: raise Exception(f"Max retries exceeded for: {url}") def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False csv_name = re.sub(r'[<>:"/|?*]', "", row["name"].replace(" ", "-")) scrapeops_proxy_url = get_scrapeops_url(url, location=location) while tries <= retries and not success: try: chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(scrapeops_proxy_url) logger.info(f"Accessing URL: {url}") # Wait for the review cards to load WebDriverWait(driver, 20).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "div[role='listitem']")) ) review_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") review_pipeline = DataPipeline(csv_filename=f"{csv_name}.csv") for review_card in review_cards: name = review_card.find_element(By.TAG_NAME, "h3").text stars = len(review_card.find_elements(By.TAG_NAME, "svg")) spans = review_card.find_elements(By.TAG_NAME, "span") review = spans[-1].text if spans else "No review available" review_data = ReviewData( name=name, stars=stars, review=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True logger.info(f"Successfully parsed: {url}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}") logger.warning(f"Retries left: {retries - tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION,max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: Specifies the maximum number of attempts the script will retry if a request fails.MAX_THREADS
: Specifies the maximum number of concurrent tasks (or threads) the script will use while scraping data.PAGES
: Sets the number of search result pages the scraper will try to process.LOCATION
: Defines the country code of the location for simulating the scraping requests.keyword_list
: Contains the list of phrases or keywords the script will use to search for listings on the website.https://www.airbnb.com/s/Myrtle-Beach--South-Carolina--United-States/homes
https://www.airbnb.com/s/{NAME-OF-SEARCH-LOCATION}/homes
https://www.airbnb.com/rooms/34653621?adults=1&children=0&enable_m3_private_room=true&infants=0&pets=0&search_mode=regular_search&check_in=2024-09-02&check_out=2024-09-07&source_impression_id=p3_1723223538_P3jJDPiXFbNNUsdP&previous_page_section_name=1000&federated_search_id=532193a1-1995-4edd-824a-5987dfa778f1
data-testid
set to card-container.We can locate them through the CSS selector "div[data-testid='card-container']"
. Within these cards, we can access all the additional information we need to extract. The location of this data is shown in the HTML below."div[role='listitem']"
.Check it out in the image below. From this div, we can access all the review data relevant to us.https://www.airbnb.com/s/Myrtle-Beach--South-Carolina--United-States/homes?tab_id=home_tab&refinement_paths%5B%5D=%2Fhomes&query=Myrtle%20Beach%2C%20South%20Carolina%2C%20United%20States&place_id=ChIJASFVO5VoAIkRGJbQtRWxD7w&flexible_trip_lengths%5B%5D=one_week&monthly_start_date=2024-09-01&monthly_length=3&monthly_end_date=2024-12-01&search_mode=regular_search&price_filter_input_type=0&channel=EXPLORE&federated_search_session_id=dcc6f5af-f1c5-4463-8c02-7e4dcf38a02d&search_type=unknown&pagination_search=true&cursor=eyJzZWN0aW9uX29mZnNldCI6MCwiaXRlbXNfb2Zmc2V0IjoxOCwidmVyc2lvbiI6MX0%3D
"country": "us"
directs ScrapeOps to make us appear in the US."country": "uk"
allows us to appear in the UK.mkdir airbnb-scraper cd airbnb-scraper
python -m venv venv
source venv/bin/activate
pip install selenium pip install webdriver-manager
import os import json import logging from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def scrape_search_results(url, location, retries=3): # Initialize WebDriver inside the function chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) tries = 0 success = False while tries <= retries and not success: try: driver.get(url) logger.info(f"Loaded page: {url}") # Wait for listings to load WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']")) ) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']") for div_card in div_cards: description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']") name = subtitle_elements[0].text dates = subtitle_elements[-1].text price = div_card.find_element(By.CSS_SELECTOR, "span div span").text href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href") link = f"https://www.airbnb.com{href}" search_data = { "name": name, "description": description, "dates": dates, "price": price, "url": link } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") tries += 1 driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] # Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" scrape_search_results(url, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text
.div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']")
.div_card.find_element(By.TAG_NAME, "a").get_attribute("href").
link = f"https://www.airbnb.com{href}"
."nav[aria-label='Search results pagination']"
.Here is find_pagination_urls()
:def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries = 0 success = False # Initialize WebDriver chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Loaded page: {url}") # Wait for pagination bar to load WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")) ) pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_elements(By.TAG_NAME, "a") links = [url] # Start with the first page link acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: href = a.get_attribute("href") links.append(href) success = True return links except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 driver.quit() if not success: raise Exception("Failed to find pagination, max retries exceeded!")
driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")
locates the pagination links bar.pagination_bar.find_elements(By.TAG_NAME, "a")
, we locate all links on this bar.["1", "2", "3", "4"]
, to match with the button links.start_scrape()
function.Now, we’ll write a start_scrape()
function to accept a list of URLs and to call scrape_search_results()
for each URL in url_list
with a simple for loop.def start_scrape(url_list, location, retries=3): for url in url_list: scrape_search_results(url, location, retries=retries)
import os import json import logging from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries = 0 success = False # Initialize WebDriver chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Loaded page: {url}") # Wait for pagination bar to load WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")) ) pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_elements(By.TAG_NAME, "a") links = [url] # Start with the first page link acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: href = a.get_attribute("href") links.append(href) success = True return links except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 driver.quit() if not success: raise Exception("Failed to find pagination, max retries exceeded!") def scrape_search_results(url, location, retries=3): tries = 0 success = False # Initialize WebDriver inside the function chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) while tries <= retries and not success: try: driver.get(url) logger.info(f"Loaded page: {url}") # Wait for listings to load WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']")) ) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']") for div_card in div_cards: description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']") name = subtitle_elements[0].text dates = subtitle_elements[-1].text price = div_card.find_element(By.CSS_SELECTOR, "span div span").text href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href") link = f"https://www.airbnb.com{href}" search_data = { "name": name, "description": description, "dates": dates, "price": price, "url": link } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") tries += 1 driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(url_list, location, retries=3): for url in url_list: scrape_search_results(url, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] # Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) start_scrape(page_urls, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
start_scrape()
to apply scrape_search_results()
to each URL in the generated list.DataPipeline
and a dataclass
.We'll name the dataclass SearchData
. This SearchData
is passed into the DataPipeline, which transfers our data to a CSV file and removes any duplicate results.Here is our SearchData
dataclass:@dataclass class SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip())
class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = list(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv()
import os import csv import json import logging import time from dataclasses import dataclass, fields, asdict from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Data class to hold scraped information @dataclass class SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) # Pipeline class to manage data storage and duplicates class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = list(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries, success = 0, False links = [url] while tries < retries and not success: try: chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(url) WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']"))) pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_elements(By.TAG_NAME, "a") acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: link = a.get_attribute("href") if link: links.append(link) success = True driver.quit() except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 driver.quit() if not success: raise Exception("Failed to find pagination, max retries exceeded!") return links def scrape_search_results(url, location,data_pipeline=None, retries=3): tries = 0 success = False while tries <= retries and not success: try: # Initialize WebDriver inside the function chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(url) logger.info(f"Loaded page: {url}") # Wait for listings to load WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']")) ) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']") for div_card in div_cards: description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']") name = subtitle_elements[0].text dates = subtitle_elements[-1].text price = div_card.find_element(By.CSS_SELECTOR, "span div span").text href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href") link = f"https://www.airbnb.com{href}" search_data = SearchData( name=name, description=description, dates=dates, price=price, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") tries += 1 driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(url_list, location, data_pipeline=None, retries=3): for url in url_list: scrape_search_results(url, location, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["Myrtle Beach, South Carolina, United States"] for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
start_scrape()
, which subsequently forwards it to scrape_search_results()
.SearchData
object is created and directed into the pipeline.crawl_pipeline.close_pipeline()
.ThreadPoolExecutor
. A new pool of threads up to max_threads will be opened by ThreadPoolExecutor
.It then calls a function on each of these open threads and passes arguments to it.This method is significantly faster than using a simple for loop.Here is our updated start_scrape()
.def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) )
executor.map()
, you will observe the following:scrape_search_results
.url_list
.import os import csv import json import logging import time from dataclasses import dataclass, fields, asdict from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Data class to hold scraped information @dataclass class SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) # Pipeline class to manage data storage and duplicates class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = list(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries, success = 0, False links = [url] while tries < retries and not success: try: chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(url) WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']"))) pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_elements(By.TAG_NAME, "a") acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: link = a.get_attribute("href") if link: links.append(link) success = True driver.quit() except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 driver.quit() if not success: raise Exception("Failed to find pagination, max retries exceeded!") return links def scrape_search_results(url, location,data_pipeline=None, retries=3): tries = 0 success = False while tries <= retries and not success: try: # Initialize WebDriver inside the function chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(url) logger.info(f"Loaded page: {url}") # Wait for listings to load WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']")) ) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']") for div_card in div_cards: description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']") name = subtitle_elements[0].text dates = subtitle_elements[-1].text price = div_card.find_element(By.CSS_SELECTOR, "span div span").text href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href") link = f"https://www.airbnb.com{href}" search_data = SearchData( name=name, description=description, dates=dates, price=price, url=link ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries}") tries += 1 driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["Myrtle Beach, South Carolina, United States"] for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS,retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
get_scrapeops_url()
, as shown below.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
"api_key."
"url"
."country"
."Wait"
instructs ScrapeOps to pause for a specified time before returning our result, allowing the page's content to load.import os import csv import json import logging import time from urllib.parse import urlencode from dataclasses import dataclass, fields, asdict from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Data class to hold scraped information @dataclass class SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) # Pipeline class to manage data storage and duplicates class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = list(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries, success = 0, False links = [url] while tries < retries and not success: try: chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--disable-extensions") chrome_options.add_argument("--disable-blink-features=AutomationControlled") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(url) WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']"))) pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_elements(By.TAG_NAME, "a") acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: link = a.get_attribute("href") if link: links.append(link) success = True driver.quit() except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 driver.quit() if not success: raise Exception("Failed to find pagination, max retries exceeded!") return links def scrape_search_results(url, location, data_pipeline=None, retries=3): tries = 0 success = False scrapeops_proxy_url = get_scrapeops_url(url, location=location) while tries <= retries and not success: try: # Initialize WebDriver inside the function chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(scrapeops_proxy_url) logger.info(f"Loaded page: {url}") # Wait for listings to load WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']")) ) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']") for div_card in div_cards: description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']") name = subtitle_elements[0].text if len(subtitle_elements) > 0 else "No Name" dates = subtitle_elements[-1].text if len(subtitle_elements) > 1 else "No Dates" price = div_card.find_element(By.CSS_SELECTOR, "span div span").text if div_card.find_elements(By.CSS_SELECTOR, "span div span") else "No Price" href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href") # Remove the proxy URL part and construct the original Airbnb URL original_url = href.replace("https://proxy.scrapeops.io/", "https://www.airbnb.com/") search_data = SearchData( name=name, description=description, dates=dates, price=price, url=original_url # Use the cleaned URL ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}") tries += 1 finally: driver.quit() # Ensures driver is closed on each attempt if not success: raise Exception(f"Max retries exceeded for: {url}") def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["Myrtle Beach, South Carolina, United States"] for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS,retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
MAX_THREADS
, MAX_RETRIES
, PAGES
, LOCATION
, or keyword_list
if you want to modify your results. We will set PAGES
to 4, providing the maximum pages from our pagination scraper.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") keyword_list = ["Myrtle Beach, South Carolina, United States"] for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS,retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
soup.select("div[role='listitem']")
, we first locate all review cards.We then iterate through these cards, pulling the name, stars, and review from each one. These objects represent the data we aim to store for future review.def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(url) logger.info(f"Accessing URL: {url}") # Wait for the review cards to load WebDriverWait(driver, 20).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "div[role='listitem']")) ) review_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") for review_card in review_cards: name = review_card.find_element(By.TAG_NAME, "h3").text stars = len(review_card.find_elements(By.TAG_NAME, "svg")) spans = review_card.find_elements(By.TAG_NAME, "span") review = spans[-1].text if spans else "No review available" review_data = { "name": name, "stars": stars, "review": review } print(review_data) success = True logger.info(f"Successfully parsed: {url}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}") logger.warning(f"Retries left: {retries - tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}")
driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']")
locates the list of review cards. From each review, we extract the following details:start_scrape()
. The primary distinction is that this function will read a CSV file before invoking the parsing function.Here is process_results()
.reader
.process_listing()
.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries)
import os import csv import json import logging import time from urllib.parse import urlencode from dataclasses import dataclass, fields, asdict from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = list(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries, success = 0, False links = [url] while tries < retries and not success: try: chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--disable-extensions") chrome_options.add_argument("--disable-blink-features=AutomationControlled") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(url) WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']"))) pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_elements(By.TAG_NAME, "a") acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: link = a.get_attribute("href") if link: links.append(link) success = True driver.quit() except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 driver.quit() if not success: raise Exception("Failed to find pagination, max retries exceeded!") return links def scrape_search_results(url, location, data_pipeline=None, retries=3): tries = 0 success = False scrapeops_proxy_url = get_scrapeops_url(url, location=location) while tries <= retries and not success: try: # Initialize WebDriver inside the function chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(scrapeops_proxy_url) logger.info(f"Loaded page: {url}") # Wait for listings to load WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']")) ) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']") for div_card in div_cards: description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']") name = subtitle_elements[0].text if len(subtitle_elements) > 0 else "No Name" dates = subtitle_elements[-1].text if len(subtitle_elements) > 1 else "No Dates" price = div_card.find_element(By.CSS_SELECTOR, "span div span").text if div_card.find_elements(By.CSS_SELECTOR, "span div span") else "No Price" href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href") # Remove the proxy URL part and construct the original Airbnb URL original_url = href.replace("https://proxy.scrapeops.io/", "https://www.airbnb.com/") search_data = SearchData( name=name, description=description, dates=dates, price=price, url=original_url # Use the cleaned URL ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}") tries += 1 finally: driver.quit() # Ensures driver is closed on each attempt if not success: raise Exception(f"Max retries exceeded for: {url}") def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(url) logger.info(f"Accessing URL: {url}") # Wait for the review cards to load WebDriverWait(driver, 20).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "div[role='listitem']")) ) review_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") for review_card in review_cards: name = review_card.find_element(By.TAG_NAME, "h3").text stars = len(review_card.find_elements(By.TAG_NAME, "svg")) spans = review_card.find_elements(By.TAG_NAME, "span") review = spans[-1].text if spans else "No review available" review_data = { "name": name, "stars": stars, "review": review } print(review_data) success = True logger.info(f"Successfully parsed: {url}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}") logger.warning(f"Retries left: {retries - tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES) # Ensure file is a string path
process_listing()
to each row from the CSV file.ReviewData
.Below is the definition of our new ReviewData
class.@dataclass class ReviewData: name: str = "" stars: int = 0 review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
import os import re import csv import json import logging import time from urllib.parse import urlencode from dataclasses import dataclass, fields, asdict from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class ReviewData: name: str = "" stars: int = 0 review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = list(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries, success = 0, False links = [url] while tries < retries and not success: try: chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--disable-extensions") chrome_options.add_argument("--disable-blink-features=AutomationControlled") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(url) WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']"))) pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_elements(By.TAG_NAME, "a") acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: link = a.get_attribute("href") if link: links.append(link) success = True driver.quit() except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 driver.quit() if not success: raise Exception("Failed to find pagination, max retries exceeded!") return links def scrape_search_results(url, location, data_pipeline=None, retries=3): tries = 0 success = False scrapeops_proxy_url = get_scrapeops_url(url, location=location) while tries <= retries and not success: try: # Initialize WebDriver inside the function chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(scrapeops_proxy_url) logger.info(f"Loaded page: {url}") # Wait for listings to load WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']")) ) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']") for div_card in div_cards: description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']") name = subtitle_elements[0].text if len(subtitle_elements) > 0 else "No Name" dates = subtitle_elements[-1].text if len(subtitle_elements) > 1 else "No Dates" price = div_card.find_element(By.CSS_SELECTOR, "span div span").text if div_card.find_elements(By.CSS_SELECTOR, "span div span") else "No Price" href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href") # Remove the proxy URL part and construct the original Airbnb URL original_url = href.replace("https://proxy.scrapeops.io/", "https://www.airbnb.com/") search_data = SearchData( name=name, description=description, dates=dates, price=price, url=original_url # Use the cleaned URL ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}") tries += 1 finally: driver.quit() # Ensures driver is closed on each attempt if not success: raise Exception(f"Max retries exceeded for: {url}") def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False csv_name = re.sub(r'[<>:"/|?*]', "", row["name"].replace(" ", "-")) while tries <= retries and not success: try: chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(url) logger.info(f"Accessing URL: {url}") # Wait for the review cards to load WebDriverWait(driver, 20).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "div[role='listitem']")) ) review_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") review_pipeline = DataPipeline(csv_filename=f"{csv_name}.csv") for review_card in review_cards: name = review_card.find_element(By.TAG_NAME, "h3").text stars = len(review_card.find_elements(By.TAG_NAME, "svg")) spans = review_card.find_elements(By.TAG_NAME, "span") review = spans[-1].text if spans else "No review available" review_data = ReviewData( name=name, stars=stars, review=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True logger.info(f"Successfully parsed: {url}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}") logger.warning(f"Retries left: {retries - tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_listing(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES) # Ensure file is a string path
process_listing()
, we now initiate a new DataPipeline
. We then pass the ReviewData
objects into this pipeline.Once the parsing operation is complete, we close the pipeline and exit the function.process_results()
will follow the same approach as used in start_scrape()
.To do this, we’ll use ThreadPoolExecutor
once more. The first argument, process_listing
, specifies the function we intend to run across available threads.The array of listings is reader
, and all other arguments will be passed in arrays, similar to the previous approach.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, reader, [location] * len(reader), [retries] * len(reader) )
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
import os import re import csv import json import logging import time from urllib.parse import urlencode from dataclasses import dataclass, fields, asdict from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager import concurrent.futures API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class SearchData: name: str = "" description: str = "" dates: str = "" price: str = "" url: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") else: value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclass class ReviewData: name: str = "" stars: int = 0 review: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = list(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def find_pagination_urls(keyword, location, pages=4, retries=3): formatted_keyword = keyword.replace(", ", "--").replace(" ", "-") url = f"https://www.airbnb.com/s/{formatted_keyword}/homes" tries, success = 0, False links = [url] while tries < retries and not success: try: chrome_options = Options() chrome_options.add_argument("--headless") # Run headless for speed chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--disable-extensions") chrome_options.add_argument("--disable-blink-features=AutomationControlled") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(url) WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']"))) pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']") a_tags = pagination_bar.find_elements(By.TAG_NAME, "a") acceptable_pages = ["1", "2", "3", "4"] for a in a_tags: if a.text in acceptable_pages and len(links) < pages: link = a.get_attribute("href") if link: links.append(link) success = True driver.quit() except Exception as e: logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}") logger.warning(f"Exception: {e}") tries += 1 driver.quit() if not success: raise Exception("Failed to find pagination, max retries exceeded!") return links def scrape_search_results(url, location, data_pipeline=None, retries=3): tries = 0 success = False scrapeops_proxy_url = get_scrapeops_url(url, location=location) while tries <= retries and not success: try: # Initialize WebDriver inside the function chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(scrapeops_proxy_url) logger.info(f"Loaded page: {url}") # Wait for listings to load WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']")) ) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']") for div_card in div_cards: description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']") name = subtitle_elements[0].text if len(subtitle_elements) > 0 else "No Name" dates = subtitle_elements[-1].text if len(subtitle_elements) > 1 else "No Dates" price = div_card.find_element(By.CSS_SELECTOR, "span div span").text if div_card.find_elements(By.CSS_SELECTOR, "span div span") else "No Price" href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href") # Remove the proxy URL part and construct the original Airbnb URL original_url = href.replace("https://proxy.scrapeops.io/", "https://www.airbnb.com/") search_data = SearchData( name=name, description=description, dates=dates, price=price, url=original_url # Use the cleaned URL ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}") tries += 1 finally: driver.quit() # Ensures driver is closed on each attempt if not success: raise Exception(f"Max retries exceeded for: {url}") def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, url_list, [location] * len(url_list), [data_pipeline] * len(url_list), [retries] * len(url_list) ) def process_listing(row, location, retries=3): url = row["url"] tries = 0 success = False csv_name = re.sub(r'[<>:"/|?*]', "", row["name"].replace(" ", "-")) scrapeops_proxy_url = get_scrapeops_url(url, location=location) while tries <= retries and not success: try: chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options) driver.get(scrapeops_proxy_url) logger.info(f"Accessing URL: {url}") # Wait for the review cards to load WebDriverWait(driver, 20).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, "div[role='listitem']")) ) review_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") review_pipeline = DataPipeline(csv_filename=f"{csv_name}.csv") for review_card in review_cards: name = review_card.find_element(By.TAG_NAME, "h3").text stars = len(review_card.find_elements(By.TAG_NAME, "svg")) spans = review_card.find_elements(By.TAG_NAME, "span") review = spans[-1].text if spans else "No review available" review_data = ReviewData( name=name, stars=stars, review=review ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True logger.info(f"Successfully parsed: {url}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {url}") logger.warning(f"Retries left: {retries - tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_listing, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION,max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
,MAX_THREADS
,PAGES
,LOCATION
, andkeyword_list
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 4 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["Myrtle Beach, South Carolina, United States"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(", ", "-").replace(" ", "-") page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES) crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION,max_threads=MAX_THREADS, retries=MAX_RETRIES)