Skip to main content

Scrape Airbnb With Python Selenium

How to Scrape Airbnb With Selenium

Since 2008, Airbnb has been in operation. After its rise in popularity, it completely transformed the rental and hotel industries. Instead of a hotel, you can search for short-stay rental properties on Airbnb, which provides a unique dataset for us to analyze.

Today, we’ll build a scraper project to collect Airbnb listings along with their reviews.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape Airbnb

If reading is not your preference but you need a scraper, you've come to the right place. Below, we offer a pre-built scraper ready for you to use.

  1. Start by creating a new project folder and include a config.json file with your ScrapeOps API keys.
  2. Next, create a new Python file and insert the following code.

import os
import re
import csv
import json
import logging
import time
from urllib.parse import urlencode
from dataclasses import dataclass, fields, asdict
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
stars: int = 0
review: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = list(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries, success = 0, False
links = [url]

while tries < retries and not success:
try:
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--disable-extensions")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")


driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

driver.get(url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")))

pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_elements(By.TAG_NAME, "a")
acceptable_pages = ["1", "2", "3", "4"]

for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
link = a.get_attribute("href")
if link:
links.append(link)

success = True
driver.quit()
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
driver.quit()
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")

return links

def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

while tries <= retries and not success:
try:
# Initialize WebDriver inside the function
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

driver.get(scrapeops_proxy_url)
logger.info(f"Loaded page: {url}")

# Wait for listings to load
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']"))
)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']")

for div_card in div_cards:
description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text
subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']")

name = subtitle_elements[0].text if len(subtitle_elements) > 0 else "No Name"
dates = subtitle_elements[-1].text if len(subtitle_elements) > 1 else "No Dates"
price = div_card.find_element(By.CSS_SELECTOR, "span div span").text if div_card.find_elements(By.CSS_SELECTOR, "span div span") else "No Price"
href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")

# Remove the proxy URL part and construct the original Airbnb URL
original_url = href.replace("https://proxy.scrapeops.io/", "https://www.airbnb.com/")

search_data = SearchData(
name=name,
description=description,
dates=dates,
price=price,
url=original_url # Use the cleaned URL
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}")
tries += 1
finally:
driver.quit() # Ensures driver is closed on each attempt

if not success:
raise Exception(f"Max retries exceeded for: {url}")

def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)

def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
csv_name = re.sub(r'[<>:"/|?*]', "", row["name"].replace(" ", "-"))
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

while tries <= retries and not success:
try:
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

driver.get(scrapeops_proxy_url)
logger.info(f"Accessing URL: {url}")

# Wait for the review cards to load
WebDriverWait(driver, 20).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "div[role='listitem']"))
)



review_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']")
review_pipeline = DataPipeline(csv_filename=f"{csv_name}.csv")

for review_card in review_cards:
name = review_card.find_element(By.TAG_NAME, "h3").text
stars = len(review_card.find_elements(By.TAG_NAME, "svg"))
spans = review_card.find_elements(By.TAG_NAME, "span")
review = spans[-1].text if spans else "No review available"

review_data = ReviewData(
name=name,
stars=stars,
review=review
)
review_pipeline.add_data(review_data)

review_pipeline.close_pipeline()
success = True
logger.info(f"Successfully parsed: {url}")

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {url}")
logger.warning(f"Retries left: {retries - tries}")
tries += 1
finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 4
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION,max_threads=MAX_THREADS, retries=MAX_RETRIES)

If you want to adjust your results, feel free to modify any of the following constants in main:

  • MAX_RETRIES: Specifies the maximum number of attempts the script will retry if a request fails.
  • MAX_THREADS: Specifies the maximum number of concurrent tasks (or threads) the script will use while scraping data.
  • PAGES: Sets the number of search result pages the scraper will try to process.
  • LOCATION: Defines the country code of the location for simulating the scraping requests.
  • keyword_list: Contains the list of phrases or keywords the script will use to search for listings on the website.

How To Architect Our Airbnb Scraper

This project will include three different scrapers. Our primary scrapers are the results crawler and the listing scraper.

  • The results crawler will execute a search and save the results.
  • The listing scraper will read the crawler’s report and then scrape reviews for each specific listing.

The steps for building our crawler are as follows:

  1. Create a function for parsing Airbnb listings.
  2. Implement pagination to scrape multiple result pages; this involves a mini-scraper.
  3. The mini-scraper will perform a search and gather links to additional pages.
  4. Data storage will be used to save data from each listing.
  5. Concurrency will allow us to scrape multiple pages simultaneously.
  6. A proxy will be integrated to bypass anti-bot measures.

Our listing scraper will be developed with the following:

  1. Write a parsing function to gather review data.
  2. Enable reading of URLs from a CSV file.
  3. Store each review’s data in a CSV file.
  4. Scrape review pages concurrently.
  5. Integrate a proxy once again to bypass anti-bot protection.

Understanding How To Scrape Airbnb

We now need to take a high-level view of our data. In the sections ahead, we should examine Airbnb pages to understand how they’re structured.

It’s necessary to look at how their URLs are created and identify where on each page our data is stored.

Step 1: How To Request Airbnb Pages

We'll use a simple GET request to locate our Airbnb search pages. Our initial search will provide the pages from which we will extract the reviews.

Each card within the search results has its own link that leads to the individual listing page, where the reviews can also be found.

The URL for our result pages begins like this:

https://www.airbnb.com/s/Myrtle-Beach--South-Carolina--United-States/homes

The format follows this structure:

https://www.airbnb.com/s/{NAME-OF-SEARCH-LOCATION}/homes

You can observe this in the image below.

Airbnb Search Results Page

This is an individual page listing. Using our CSV report, these are the pages we will look up. The URL, as you can see, includes a series of hashes that cannot be reproduced:

https://www.airbnb.com/rooms/34653621?adults=1&children=0&enable_m3_private_room=true&infants=0&pets=0&search_mode=regular_search&check_in=2024-09-02&check_out=2024-09-07&source_impression_id=p3_1723223538_P3jJDPiXFbNNUsdP&previous_page_section_name=1000&federated_search_id=532193a1-1995-4edd-824a-5987dfa778f1

Fortunately, we will be scraping these URLs throughout our crawl.

Airbnb Listing Page


Step 2: How To Extract Data From Airbnb Results and Pages

Since we know how to retrieve these pages, we now need to identify where the data resides. All of the data on the results page is found within div cards that have a data-testid set to card-container.

We can locate them through the CSS selector "div[data-testid='card-container']". Within these cards, we can access all the additional information we need to extract. The location of this data is shown in the HTML below.

Airbnb Search Results HTML Inspection

The process of extracting our reviews is quite similar. This time, we will be targeting div elements that have a listitem role. The CSS selector we’ll utilize is "div[role='listitem']".

Check it out in the image below. From this div, we can access all the review data relevant to us.

Airbnb Listings Page HTML Inspection


Step 3: How To Control Pagination

Handling pagination with Airbnb will be quite different from how some of our other scrapers in this series work. For our listing pages, similar to other cases, the page URLs contain a sequence of hashes that cannot be recreated.

Here’s an example URL:

https://www.airbnb.com/s/Myrtle-Beach--South-Carolina--United-States/homes?tab_id=home_tab&refinement_paths%5B%5D=%2Fhomes&query=Myrtle%20Beach%2C%20South%20Carolina%2C%20United%20States&place_id=ChIJASFVO5VoAIkRGJbQtRWxD7w&flexible_trip_lengths%5B%5D=one_week&monthly_start_date=2024-09-01&monthly_length=3&monthly_end_date=2024-12-01&search_mode=regular_search&price_filter_input_type=0&channel=EXPLORE&federated_search_session_id=dcc6f5af-f1c5-4463-8c02-7e4dcf38a02d&search_type=unknown&pagination_search=true&cursor=eyJzZWN0aW9uX29mZnNldCI6MCwiaXRlbXNfb2Zmc2V0IjoxOCwidmVyc2lvbiI6MX0%3D

To get these URLs, we’ll need to scrape them first. To do this, we’ll perform an extra GET request on the first page before initiating the scrape, then gather URLs from the page buttons, as shown in the image below.

Airbnb HTML Inspection Pagination


Step 4: Geolocated Data

To work with geolocated data, we will utilize the ScrapeOps Proxy Aggregator API and specify the country parameter.

When a country is provided to ScrapeOps, it routes us through a server located in that country.

  • For instance, "country": "us" directs ScrapeOps to make us appear in the US.
  • Similarly, "country": "uk" allows us to appear in the UK.

This setup provides us with an IP address from within the selected country.


Setting Up Our Airbnb Scraper Project

Create a New Project Folder

mkdir airbnb-scraper  
cd airbnb-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install selenium  
pip install webdriver-manager

Build An Airbnb Search Crawler

Now it’s time to code. We'll start with our crawler by writing a parser, followed by a pagination scraper.

Then, we’ll incorporate data storage, concurrency, and proxy integration. It may seem extensive, but each step will be explained in detail in the upcoming sections.


Step 1: Create Simple Search Data Parser

We’ll begin by developing a parser. This involves adding imports, error handling, retry logic, and the foundation of our structure.

Once we locate all data, we print it to the terminal. For now, it only parses the first page of the search, but later, we will modify this for pagination and data storage.

Pay special attention to the parsing function, where the actual scraping occurs.


import os
import json
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def scrape_search_results(url, location, retries=3):
# Initialize WebDriver inside the function
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

tries = 0
success = False

while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Loaded page: {url}")

# Wait for listings to load
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']"))
)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']")

for div_card in div_cards:
description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text
subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']")

name = subtitle_elements[0].text
dates = subtitle_elements[-1].text
price = div_card.find_element(By.CSS_SELECTOR, "span div span").text
href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")
link = f"https://www.airbnb.com{href}"

search_data = {
"name": name,
"description": description,
"dates": dates,
"price": price,
"url": link
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
tries += 1

driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

# INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]

# Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"

scrape_search_results(url, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

To parse the data:

  • We retrieve the title using div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text.
  • For the array of subtitles, we utilize div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']").
  • The link to the listing page is located with div_card.find_element(By.TAG_NAME, "a").get_attribute("href").
  • Finally, we fix the URL with link = f"https://www.airbnb.com{href}".

Step 2: Add Pagination

At this point, pagination needs to be added. This process will differ significantly from how pagination is typically added on most other sites. As reproducing pagination in an Airbnb URL is not possible, scraping the paginated links is necessary.

The function below locates all pagination links using the CSS selector, "nav[aria-label='Search results pagination']".

Here is find_pagination_urls():


def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False

# Initialize WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Loaded page: {url}")

# Wait for pagination bar to load
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']"))
)

pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_elements(By.TAG_NAME, "a")

links = [url] # Start with the first page link
acceptable_pages = ["1", "2", "3", "4"]

for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get_attribute("href")
links.append(href)

success = True
return links

except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1

driver.quit()
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")

  • The driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']") locates the pagination links bar.
  • Next, using pagination_bar.find_elements(By.TAG_NAME, "a"), we locate all links on this bar.
  • Being on the first page, we include our current URL in the links array.
  • The visible page buttons navigate only up to page 4, so we create a comparison array, ["1", "2", "3", "4"], to match with the button links.

If any link button displays text found in this array, it is added to our list. After building this list, we return it and use it as input for our start_scrape() function.

Now, we’ll write a start_scrape() function to accept a list of URLs and to call scrape_search_results() for each URL in url_list with a simple for loop.


def start_scrape(url_list, location, retries=3):
for url in url_list:
scrape_search_results(url, location, retries=retries)

Here is our code after putting the above steps together:


import os
import json
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False

# Initialize WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Loaded page: {url}")

# Wait for pagination bar to load
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']"))
)

pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_elements(By.TAG_NAME, "a")

links = [url] # Start with the first page link
acceptable_pages = ["1", "2", "3", "4"]

for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get_attribute("href")
links.append(href)

success = True
return links

except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1

driver.quit()
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")

def scrape_search_results(url, location, retries=3):
tries = 0
success = False

# Initialize WebDriver inside the function
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Loaded page: {url}")

# Wait for listings to load
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']"))
)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']")

for div_card in div_cards:
description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text
subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']")

name = subtitle_elements[0].text
dates = subtitle_elements[-1].text
price = div_card.find_element(By.CSS_SELECTOR, "span div span").text
href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")
link = f"https://www.airbnb.com{href}"

search_data = {
"name": name,
"description": description,
"dates": dates,
"price": price,
"url": link
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
tries += 1

driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def start_scrape(url_list, location, retries=3):
for url in url_list:
scrape_search_results(url, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

# INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]

# Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

start_scrape(page_urls, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")

First, we scrape the URLs for pagination. Next, we execute start_scrape() to apply scrape_search_results() to each URL in the generated list.


Step 3: Storing the Scraped Data

To store the data we've scraped, we need to add a DataPipeline and a dataclass.

We'll name the dataclass SearchData. This SearchData is passed into the DataPipeline, which transfers our data to a CSV file and removes any duplicate results.

Here is our SearchData dataclass:


@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is the DataPipeline below:


class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = list(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

Here is the updated code:


import os
import csv
import json
import logging
import time
from dataclasses import dataclass, fields, asdict
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Data class to hold scraped information
@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

# Pipeline class to manage data storage and duplicates
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = list(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries, success = 0, False
links = [url]

while tries < retries and not success:
try:
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
driver.get(url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")))

pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_elements(By.TAG_NAME, "a")
acceptable_pages = ["1", "2", "3", "4"]

for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
link = a.get_attribute("href")
if link:
links.append(link)

success = True
driver.quit()
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
driver.quit()
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")

return links

def scrape_search_results(url, location,data_pipeline=None, retries=3):
tries = 0
success = False



while tries <= retries and not success:
try:
# Initialize WebDriver inside the function
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
driver.get(url)
logger.info(f"Loaded page: {url}")

# Wait for listings to load
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']"))
)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']")

for div_card in div_cards:
description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text
subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']")

name = subtitle_elements[0].text
dates = subtitle_elements[-1].text
price = div_card.find_element(By.CSS_SELECTOR, "span div span").text
href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")
link = f"https://www.airbnb.com{href}"

search_data = SearchData(
name=name,
description=description,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
tries += 1

driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def start_scrape(url_list, location, data_pipeline=None, retries=3):
for url in url_list:
scrape_search_results(url, location, data_pipeline=data_pipeline, retries=retries)

if __name__ == "__main__":
MAX_RETRIES = 3
PAGES = 4
LOCATION = "us"
logger.info(f"Crawl starting...")

keyword_list = ["Myrtle Beach, South Carolina, United States"]
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()

logger.info(f"Crawl complete.")
  • Before beginning our scrape, a new DataPipeline is created.
  • This DataPipeline is then passed into start_scrape(), which subsequently forwards it to scrape_search_results().
  • Within our parsing function, a SearchData object is created and directed into the pipeline.
  • After the crawl completes, the pipeline is closed using crawl_pipeline.close_pipeline().

Step 4: Adding Concurrency

In this section, concurrency will be added. We'll be utilizing ThreadPoolExecutor. A new pool of threads up to max_threads will be opened by ThreadPoolExecutor.

It then calls a function on each of these open threads and passes arguments to it.

This method is significantly faster than using a simple for loop.

Here is our updated start_scrape().


def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)

If you examine executor.map(), you will observe the following:

  • The function we want to execute on available threads is scrape_search_results.
  • The list we aim to apply the function to is url_list.
  • All additional arguments are passed as arrays.

Here is our Python script, now fully updated.


import os
import csv
import json
import logging
import time
from dataclasses import dataclass, fields, asdict
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Data class to hold scraped information
@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

# Pipeline class to manage data storage and duplicates
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = list(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries, success = 0, False
links = [url]

while tries < retries and not success:
try:
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
driver.get(url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")))

pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_elements(By.TAG_NAME, "a")
acceptable_pages = ["1", "2", "3", "4"]

for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
link = a.get_attribute("href")
if link:
links.append(link)

success = True
driver.quit()
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
driver.quit()
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")

return links

def scrape_search_results(url, location,data_pipeline=None, retries=3):
tries = 0
success = False



while tries <= retries and not success:
try:
# Initialize WebDriver inside the function
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
driver.get(url)
logger.info(f"Loaded page: {url}")

# Wait for listings to load
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']"))
)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']")

for div_card in div_cards:
description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text
subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']")

name = subtitle_elements[0].text
dates = subtitle_elements[-1].text
price = div_card.find_element(By.CSS_SELECTOR, "span div span").text
href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")
link = f"https://www.airbnb.com{href}"

search_data = SearchData(
name=name,
description=description,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
tries += 1

driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 4

LOCATION = "us"
logger.info(f"Crawl starting...")

keyword_list = ["Myrtle Beach, South Carolina, United States"]
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS,retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()

logger.info(f"Crawl complete.")

Now that concurrency is in place, we only need to integrate with a proxy, and production readiness will be achieved.


Step 5: Bypassing Anti-Bots

To avoid anti-bots, we'll employ a specific function. This function requires an API key, a URL, and a few additional parameters, which it will combine to form a ScrapeOps proxied URL.

This function will be named get_scrapeops_url(), as shown below.


def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
  • Our ScrapeOps API key is represented by "api_key."
  • The URL we want to scrape is specified by "url".
  • The country we wish to route through is contained in "country".
  • "Wait" instructs ScrapeOps to pause for a specified time before returning our result, allowing the page's content to load.

We include this in our parsing function, making us ready to start scraping!


import os
import csv
import json
import logging
import time
from urllib.parse import urlencode
from dataclasses import dataclass, fields, asdict
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Data class to hold scraped information
@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

# Pipeline class to manage data storage and duplicates
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = list(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries, success = 0, False
links = [url]

while tries < retries and not success:
try:
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--disable-extensions")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")


driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

driver.get(url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")))

pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_elements(By.TAG_NAME, "a")
acceptable_pages = ["1", "2", "3", "4"]

for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
link = a.get_attribute("href")
if link:
links.append(link)

success = True
driver.quit()
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
driver.quit()
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")

return links

def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

while tries <= retries and not success:
try:
# Initialize WebDriver inside the function
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

driver.get(scrapeops_proxy_url)
logger.info(f"Loaded page: {url}")

# Wait for listings to load
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']"))
)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']")

for div_card in div_cards:
description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text
subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']")

name = subtitle_elements[0].text if len(subtitle_elements) > 0 else "No Name"
dates = subtitle_elements[-1].text if len(subtitle_elements) > 1 else "No Dates"
price = div_card.find_element(By.CSS_SELECTOR, "span div span").text if div_card.find_elements(By.CSS_SELECTOR, "span div span") else "No Price"
href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")

# Remove the proxy URL part and construct the original Airbnb URL
original_url = href.replace("https://proxy.scrapeops.io/", "https://www.airbnb.com/")

search_data = SearchData(
name=name,
description=description,
dates=dates,
price=price,
url=original_url # Use the cleaned URL
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}")
tries += 1
finally:
driver.quit() # Ensures driver is closed on each attempt

if not success:
raise Exception(f"Max retries exceeded for: {url}")

def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 4

LOCATION = "us"
logger.info(f"Crawl starting...")

keyword_list = ["Myrtle Beach, South Carolina, United States"]
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS,retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()

logger.info(f"Crawl complete.")


Step 6: Production Run

This is our main final. You can adjust MAX_THREADS, MAX_RETRIES, PAGES, LOCATION, or keyword_list if you want to modify your results. We will set PAGES to 4, providing the maximum pages from our pagination scraper.


if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 4

LOCATION = "us"
logger.info(f"Crawl starting...")

keyword_list = ["Myrtle Beach, South Carolina, United States"]
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline,max_threads=MAX_THREADS,retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()

logger.info(f"Crawl complete.")

The crawl completed in 44.319 seconds.

As you may recall, the wait time is set to 5 seconds, meaning at least 5 seconds were used for the pagination scrape. This leaves 39.319 seconds for the actual crawling (44.319 - 5).

Dividing 39.319 seconds by 4 pages gives 9.83 seconds per page.


Build An Airbnb Scraper

Now that crawling has been successfully set up, we will proceed to build our scraper. This scraper will read data from a CSV file, parsing each listing from it individually.

After parsing a listing, it should save the extracted information into a new CSV. For efficiency and speed, the entire process should operate concurrently. Additionally, this scraper should use a proxy to avoid being blocked.


Step 1: Create Simple Business Data Parser

To begin, let's develop our parsing function. Using the CSS selector soup.select("div[role='listitem']"), we first locate all review cards.

We then iterate through these cards, pulling the name, stars, and review from each one. These objects represent the data we aim to store for future review.


def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
try:
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
driver.get(url)
logger.info(f"Accessing URL: {url}")

# Wait for the review cards to load
WebDriverWait(driver, 20).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "div[role='listitem']"))
)

review_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']")
for review_card in review_cards:
name = review_card.find_element(By.TAG_NAME, "h3").text
stars = len(review_card.find_elements(By.TAG_NAME, "svg"))
spans = review_card.find_elements(By.TAG_NAME, "span")
review = spans[-1].text if spans else "No review available"

review_data = {
"name": name,
"stars": stars,
"review": review
}

print(review_data)

success = True
logger.info(f"Successfully parsed: {url}")

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {url}")
logger.warning(f"Retries left: {retries - tries}")
tries += 1
finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") locates the list of review cards. From each review, we extract the following details:

  • name
  • stars
  • reviews

Step 2: Loading URLs To Scrape

To apply our parsing function, a URL must be provided. We will create a new function, similar to start_scrape(). The primary distinction is that this function will read a CSV file before invoking the parsing function.

Here is process_results().

  • Initially, the CSV file is opened and read into an array called reader.
  • Once the array is prepared, we loop through it and invoke process_listing().

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

Here is the updated code:


import os
import csv
import json
import logging
import time
from urllib.parse import urlencode
from dataclasses import dataclass, fields, asdict
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = list(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries, success = 0, False
links = [url]

while tries < retries and not success:
try:
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--disable-extensions")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")


driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

driver.get(url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")))

pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_elements(By.TAG_NAME, "a")
acceptable_pages = ["1", "2", "3", "4"]

for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
link = a.get_attribute("href")
if link:
links.append(link)

success = True
driver.quit()
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
driver.quit()
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")

return links

def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

while tries <= retries and not success:
try:
# Initialize WebDriver inside the function
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

driver.get(scrapeops_proxy_url)
logger.info(f"Loaded page: {url}")

# Wait for listings to load
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']"))
)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']")

for div_card in div_cards:
description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text
subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']")

name = subtitle_elements[0].text if len(subtitle_elements) > 0 else "No Name"
dates = subtitle_elements[-1].text if len(subtitle_elements) > 1 else "No Dates"
price = div_card.find_element(By.CSS_SELECTOR, "span div span").text if div_card.find_elements(By.CSS_SELECTOR, "span div span") else "No Price"
href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")

# Remove the proxy URL part and construct the original Airbnb URL
original_url = href.replace("https://proxy.scrapeops.io/", "https://www.airbnb.com/")

search_data = SearchData(
name=name,
description=description,
dates=dates,
price=price,
url=original_url # Use the cleaned URL
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}")
tries += 1
finally:
driver.quit() # Ensures driver is closed on each attempt

if not success:
raise Exception(f"Max retries exceeded for: {url}")

def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)

def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
try:
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
driver.get(url)
logger.info(f"Accessing URL: {url}")

# Wait for the review cards to load
WebDriverWait(driver, 20).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "div[role='listitem']"))
)

review_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']")
for review_card in review_cards:
name = review_card.find_element(By.TAG_NAME, "h3").text
stars = len(review_card.find_elements(By.TAG_NAME, "svg"))
spans = review_card.find_elements(By.TAG_NAME, "span")
review = spans[-1].text if spans else "No review available"

review_data = {
"name": name,
"stars": stars,
"review": review
}

print(review_data)

success = True
logger.info(f"Successfully parsed: {url}")

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {url}")
logger.warning(f"Retries left: {retries - tries}")
tries += 1
finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES) # Ensure file is a string path

We read our CSV into an array now. Once the array is created, we go through it and apply process_listing() to each row from the CSV file.


Step 3: Storing the Scraped Data

At this stage, storing our data becomes quite straightforward. Our DataPipeline is already set up; we simply need to provide it with a new dataclass.

This new dataclass will represent the review objects we've been parsing in the previous examples. We’ll name this dataclass ReviewData.

Below is the definition of our new ReviewData class.


@dataclass
class ReviewData:
name: str = ""
stars: int = 0
review: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is the updated code:


import os
import re
import csv
import json
import logging
import time
from urllib.parse import urlencode
from dataclasses import dataclass, fields, asdict
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
stars: int = 0
review: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = list(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries, success = 0, False
links = [url]

while tries < retries and not success:
try:
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--disable-extensions")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")


driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

driver.get(url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")))

pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_elements(By.TAG_NAME, "a")
acceptable_pages = ["1", "2", "3", "4"]

for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
link = a.get_attribute("href")
if link:
links.append(link)

success = True
driver.quit()
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
driver.quit()
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")

return links

def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

while tries <= retries and not success:
try:
# Initialize WebDriver inside the function
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

driver.get(scrapeops_proxy_url)
logger.info(f"Loaded page: {url}")

# Wait for listings to load
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']"))
)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']")

for div_card in div_cards:
description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text
subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']")

name = subtitle_elements[0].text if len(subtitle_elements) > 0 else "No Name"
dates = subtitle_elements[-1].text if len(subtitle_elements) > 1 else "No Dates"
price = div_card.find_element(By.CSS_SELECTOR, "span div span").text if div_card.find_elements(By.CSS_SELECTOR, "span div span") else "No Price"
href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")

# Remove the proxy URL part and construct the original Airbnb URL
original_url = href.replace("https://proxy.scrapeops.io/", "https://www.airbnb.com/")

search_data = SearchData(
name=name,
description=description,
dates=dates,
price=price,
url=original_url # Use the cleaned URL
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}")
tries += 1
finally:
driver.quit() # Ensures driver is closed on each attempt

if not success:
raise Exception(f"Max retries exceeded for: {url}")

def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)

def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
csv_name = re.sub(r'[<>:"/|?*]', "", row["name"].replace(" ", "-"))

while tries <= retries and not success:
try:
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
driver.get(url)
logger.info(f"Accessing URL: {url}")

# Wait for the review cards to load
WebDriverWait(driver, 20).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "div[role='listitem']"))
)



review_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']")
review_pipeline = DataPipeline(csv_filename=f"{csv_name}.csv")

for review_card in review_cards:
name = review_card.find_element(By.TAG_NAME, "h3").text
stars = len(review_card.find_elements(By.TAG_NAME, "svg"))
spans = review_card.find_elements(By.TAG_NAME, "span")
review = spans[-1].text if spans else "No review available"

review_data = ReviewData(
name=name,
stars=stars,
review=review
)
review_pipeline.add_data(review_data)

review_pipeline.close_pipeline()
success = True
logger.info(f"Successfully parsed: {url}")

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {url}")
logger.warning(f"Retries left: {retries - tries}")
tries += 1
finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES) # Ensure file is a string path

Inside process_listing(), we now initiate a new DataPipeline. We then pass the ReviewData objects into this pipeline.

Once the parsing operation is complete, we close the pipeline and exit the function.


Step 4: Adding Concurrency

Next, we’ll introduce concurrency to our scraper. The refactoring of process_results() will follow the same approach as used in start_scrape().

To do this, we’ll use ThreadPoolExecutor once more. The first argument, process_listing, specifies the function we intend to run across available threads.

The array of listings is reader, and all other arguments will be passed in arrays, similar to the previous approach.


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)


Step 5: Bypassing Anti-Bots

At this point, bypassing anti-bots is very easy. We simply need to modify one URL, which will transform it into a ScrapeOps proxied URL.

scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)

This action changes our URL to a proxied one.

Here is the updated code:


import os
import re
import csv
import json
import logging
import time
from urllib.parse import urlencode
from dataclasses import dataclass, fields, asdict
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str) and getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
else:
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
stars: int = 0
review: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = list(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries, success = 0, False
links = [url]

while tries < retries and not success:
try:
chrome_options = Options()
chrome_options.add_argument("--headless") # Run headless for speed
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--disable-extensions")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")


driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

driver.get(url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")))

pagination_bar = driver.find_element(By.CSS_SELECTOR, "nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_elements(By.TAG_NAME, "a")
acceptable_pages = ["1", "2", "3", "4"]

for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
link = a.get_attribute("href")
if link:
links.append(link)

success = True
driver.quit()
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
driver.quit()
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")

return links

def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

while tries <= retries and not success:
try:
# Initialize WebDriver inside the function
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

driver.get(scrapeops_proxy_url)
logger.info(f"Loaded page: {url}")

# Wait for listings to load
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='card-container']"))
)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='card-container']")

for div_card in div_cards:
description = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='listing-card-title']").text
subtitle_elements = div_card.find_elements(By.CSS_SELECTOR, "div[data-testid='listing-card-subtitle']")

name = subtitle_elements[0].text if len(subtitle_elements) > 0 else "No Name"
dates = subtitle_elements[-1].text if len(subtitle_elements) > 1 else "No Dates"
price = div_card.find_element(By.CSS_SELECTOR, "span div span").text if div_card.find_elements(By.CSS_SELECTOR, "span div span") else "No Price"
href = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")

# Remove the proxy URL part and construct the original Airbnb URL
original_url = href.replace("https://proxy.scrapeops.io/", "https://www.airbnb.com/")

search_data = SearchData(
name=name,
description=description,
dates=dates,
price=price,
url=original_url # Use the cleaned URL
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries - 1}")
tries += 1
finally:
driver.quit() # Ensures driver is closed on each attempt

if not success:
raise Exception(f"Max retries exceeded for: {url}")

def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)

def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
csv_name = re.sub(r'[<>:"/|?*]', "", row["name"].replace(" ", "-"))
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

while tries <= retries and not success:
try:
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)

driver.get(scrapeops_proxy_url)
logger.info(f"Accessing URL: {url}")

# Wait for the review cards to load
WebDriverWait(driver, 20).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "div[role='listitem']"))
)



review_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']")
review_pipeline = DataPipeline(csv_filename=f"{csv_name}.csv")

for review_card in review_cards:
name = review_card.find_element(By.TAG_NAME, "h3").text
stars = len(review_card.find_elements(By.TAG_NAME, "svg"))
spans = review_card.find_elements(By.TAG_NAME, "span")
review = spans[-1].text if spans else "No review available"

review_data = ReviewData(
name=name,
stars=stars,
review=review
)
review_pipeline.add_data(review_data)

review_pipeline.close_pipeline()
success = True
logger.info(f"Successfully parsed: {url}")

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {url}")
logger.warning(f"Retries left: {retries - tries}")
tries += 1
finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION,max_threads=MAX_THREADS, retries=MAX_RETRIES)


Step 6: Production Run

It's time to start our production run. As with previous times, you may adjust any of the following:

  • MAX_RETRIES,
  • MAX_THREADS,
  • PAGES,
  • LOCATION, and
  • keyword_list.

Below, you can see our updated main. As a reminder, pages are currently set to 4.


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 4
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION,max_threads=MAX_THREADS, retries=MAX_RETRIES)

If you remember before, it took 44.319 seconds to complete the intitial pagination scrape and the crawl. This time around, the crawl generated a CSV file with 59 results. 1412 - 44.319 = 1367.681 seconds scraping listings. 1367.681 seconds / 59 listings = 23.18 seconds per listing.

This is very fast and efficient!


When you access Airbnb, you are subject to their terms of service. You can view Airbnb's terms here

Additionally, using a bot or scraper to interact with their site means you're also subject to the rules outlined in their robots.txt file, available here.

Scraping public data is generally considered legal, but scraping private data (such as content gated behind a login) is a completely different story.

This not only requires compliance with the site's terms but also adherence to the relevant privacy laws governing the platform's operations.

Always consult an attorney when you have questions about the legality of your scraper.


Conclusion

You’ve successfully scraped Airbnb! This project likely introduced unique challenges that extended beyond a typical scraping job.

By now, you should have developed a solid understanding of key tools like Python Selenium. You've also tackled concepts such as parsing, pagination, data storage, concurrency, and proxy integration.

If you'd like to know more about the tech stack used in this article, take a look at the links below.


More Python Web Scraping Guides

At ScrapeOps, we offer a wide range of guides and tutorials to help you enhance your web scraping skills, whether you're just starting out or already experienced.

Check out our Selenium Web Scraping Playbook and level up your skillset today!

If you want to read more of our "How To Scrape" series, check out the articles listed below.