Skip to main content

Scrape Airbnb With Python Requests and BeautifulSoup

How to Scrape Airbnb Requests and BeautifulSoup

Airbnb has been operating since 2008. Once it blew up, Airbnb completely upended both the hotel and rental industries. On Airbnb, you can search for short-stay rental properties in place of a hotel. This gives Airbnb a very unique dataset for us to work with.

Today, we'll create a scraper project that extracts Airbnb listings and their reviews.


TLDR - How to Scrape Airbnb

If you need a scraper but you don't want to read, look no further. In the section below, we have a pre-built scraper for you to use.

  1. First, make a new project folder and add a config.json file with your ScrapeOps API keys.
  2. Then make a new Python file and paste the following code into it.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
stars: int = 0
review: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links

except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")




def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")


for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")

name = subtitle_array[0].text
dates = subtitle_array[-1].text

price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"

search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)


def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[role='listitem']")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '-')}.csv")

for review_card in review_cards:
name = review_card.find("h3").text
stars = len(review_card.find_all("svg"))
spans = review_card.find_all("span")
review = spans[-1].text

review_data = ReviewData(
name=name,
stars=stars,
review=review
)
review_pipeline.add_data(review_data)

review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 4
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

If you'd like to customize your results, go ahead and change any of the following constants from main:

  • MAX_RETRIES: Sets the maximum number of retry attempts the script will make if a request fails.
  • MAX_THREADS: Sets the maximum number of threads (or concurrent tasks) that the script will use when scraping data.
  • PAGES: Determines how many pages of search results the scraper will attempt to process.
  • LOCATION: Specifies the country code for the location from which you want to simulate the scraping requests.
  • keyword_list: A list of keywords or phrases that the script will use to search for listings on the website.

How To Architect Our Airbnb Scraper

This project will actually consist of three separate scrapers. Our two main scrapers are the results crawler and the listing scraper.

Our result crawler will perform a search and store the results. Our listing scraper is going to read the report from our crawler, and then scrape reviews for each individual listing.

The process for our building our crawler goes as follows:

  1. Write a parsing function to scrape Airbnb listings.
  2. Add pagination so we can scrape multiple pages of results. This step requires a mini-scraper. The mini-scraper is going to perform a search and extract the links to other pages.
  3. We'll use data storage to save the data from each listing.
  4. Concurrency will give us the ability to scrape multiple pages at once.
  5. Proxy integration will be used to bypass anti-bots.

We'll build our listing scraper by adding the following.

  1. Write a parsing to extract review data.
  2. Add the ability to read urls from our CSV file.
  3. Store the data from each review to a CSV.
  4. Concurrently scrape these review pages.
  5. Integrate with a proxy to once again get past anti-bots.

Understanding How To Scrape Airbnb

Now, we need to get a look at our data from a high level. In these coming sections, we need to look at Airbnb pages and see how they're built. We need to find how their urls are constructed and we need to find where our data is being kept on each page.

Step 1: How To Request Airbnb Pages

We'll use a simple GET request to find our Airbnb search pages. Our review pages will be extracted from our initial search. Each card in the search results contains its own link to the individual listing page and therefore the reviews.

Our result pages start with a URL that looks like this:

https://www.airbnb.com/s/Myrtle-Beach--South-Carolina--United-States/homes

The format would be:

https://www.airbnb.com/s/{NAME-OF-SEARCH-LOCATION}/homes

You can see this in the image below.

Airbnb Search Results Page

Here is an individual listing page. These are the pages we'll look up using our CSV report. If you look at the URL, it contains a series of hashes which are impossible to reproduce:

https://www.airbnb.com/rooms/34653621?adults=1&children=0&enable_m3_private_room=true&infants=0&pets=0&search_mode=regular_search&check_in=2024-09-02&check_out=2024-09-07&source_impression_id=p3_1723223538_P3jJDPiXFbNNUsdP&previous_page_section_name=1000&federated_search_id=532193a1-1995-4edd-824a-5987dfa778f1

Lucky for us, we'll be scraping these URLs during our crawl.

Airbnb Listing Page


Step 2: How To Extract Data From Airbnb Results and Pages

Now that we know how to GET these pages, we need to understand where their data is located.

  • On the results page all of our data is located inside div cards with a data-testid of card-container.
  • We can find them using their CSS selector, "div[data-testid='card-container']".
  • From within these cards, we can find all the other information we need to pull.

You can see its location in the HTML below.

Airbnb Search Results HTML Inspection

Extracting our reviews is a really similar process. This time we'll be extracting div elements with the role of listitem.

Here is the CSS selector we would use: "div[role='listitem']".

Go ahead and look at it in the image below. From this div, we'll be able to pull all of our relevant review data.

Airbnb Listings Page HTML Inspection


Step 3: How To Control Pagination

Pagination with Airbnb is going to be handled very differently from some of our other scrapers in this series. Just like with our listing pages, our page URLs actually contain a series of hashes that we can't reproduce.

Here is an example URL:

https://www.airbnb.com/s/Myrtle-Beach--South-Carolina--United-States/homes?tab_id=home_tab&refinement_paths%5B%5D=%2Fhomes&query=Myrtle%20Beach%2C%20South%20Carolina%2C%20United%20States&place_id=ChIJASFVO5VoAIkRGJbQtRWxD7w&flexible_trip_lengths%5B%5D=one_week&monthly_start_date=2024-09-01&monthly_length=3&monthly_end_date=2024-12-01&search_mode=regular_search&price_filter_input_type=0&channel=EXPLORE&federated_search_session_id=dcc6f5af-f1c5-4463-8c02-7e4dcf38a02d&search_type=unknown&pagination_search=true&cursor=eyJzZWN0aW9uX29mZnNldCI6MCwiaXRlbXNfb2Zmc2V0IjoxOCwidmVyc2lvbiI6MX0%3D

For us to get these URLs, we're actually going to need to scrape them beforehand.

To scrape them, we're actually going to GET the first page an extra time before starting the scrape and pull urls from the page buttons like the one you can see in the image below.

alt text


Step 4: Geolocated Data

To handle geolocated data, we'll be using the ScrapeOps Proxy Aggregator API with the country parameter.

When we pass a country into ScrapeOps, they will route us through a server in that country.

  • "country": "us" tells ScrapeOps that we want to appear in the US.
  • If we want to appear in the UK, we would pass "country": "uk". This gives us an actual IP address from within the country of our choosing.

Setting Up Our Airbnb Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir airbnb-scraper

cd airbnb-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build An Airbnb Search Crawler

It's finally time to start coding. We'll get started with our crawler.

  1. First, we're going to write a parser.
  2. Next, we'll write a pagination scraper.
  3. Afterwards, we'll add data storage, concurrency and proxy integration.

Sound like a lot?
No worries, in the coming sections, we'll go through all of this step by step.


Step 1: Create Simple Search Data Parser

We'll start by building a parser.

  • We'll add our imports, some error handling, retry logic and other parts of our basic structure.
  • After finding all our data, we simply print it to the terminal.
  • At the moment, we can only parse the first page of the search. Later on we'll change this to accomodate pagination and data storage.
  • Pay close attention to our parsing function. This is where the actual scraping is taking place.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(url, location, retries=3):
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")


for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")

name = subtitle_array[0].text
dates = subtitle_array[-1].text

price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"

search_data = {
"name": name,
"description": descripition,
"dates": dates,
"price": price,
"url": link
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"

scrape_search_results(url, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

To parse our data:

  • div_card.select_one("div[data-testid='listing-card-title']").text gets our title.
  • We pull our array of subtitles with div_card.select("div[data-testid='listing-card-subtitle']").
  • div_card.find("a").get("href") finds the link to the listing page.
  • We then fix the url with link = f"https://www.airbnb.com{href}"

Step 2: Add Pagination

Now, we need to add pagination. This is going to be quite a bit different than when we add pagination with most other sites. Since we can't reproduce the pagination in an Airbnb URL, we need to scrape the paginated links.

The function below finds all the pagination links using their CSS selector, "nav[aria-label='Search results pagination']".

Here is find_pagination_urls().

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")
  • soup.select_one("nav[aria-label='Search results pagination']") finds our bar of pagination links.
  • We then find all the links on the bar with pagination_bar.find_all("a").
  • Since we're on the first page, we add our current url to the links array.
  • Our visible buttons on the page only go to page 4, so we make a string array for comparing the button links, ["1", "2", "3", "4"].
  • If a link button holds any of the text in the array above, we add it to our list.
  • Once we've got our list, we return it. We'll pass this array into our start_scrape() function.

Now, we'll make a start_scrape() function to take in a list of urls and call scrape_search_results(). It's very simple. It just takes in a url_list and uses a for loop to call scrape_search_results() on each url.

def start_scrape(url_list, location, retries=3):
for url in url_list:
scrape_search_results(url, location, retries=retries)

After we've put it all together, our code looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links

except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")




def scrape_search_results(url, location, retries=3):
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")


for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")

name = subtitle_array[0].text
dates = subtitle_array[-1].text

price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"

search_data = {
"name": name,
"description": descripition,
"dates": dates,
"price": price,
"url": link
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(url_list, location, retries=3):
for url in url_list:
scrape_search_results(url, location, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

start_scrape(page_urls, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
  • First, we scrape our pagination urls.
  • Then, we call start_scrape() to run scrape_search_results() on each and every url generated from the list.

Step 3: Storing the Scraped Data

To store our scraped data, we'll need to add a dataclass and a DataPipeline. We'll call our dataclass SearchData. This SearchData gets passed into the DataPipeline which pipes our data to a CSV file and removes duplicate results.

Here is our SearchData.

@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

You can view our DataPipeline below.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

With these added in, here is our fully updated code.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links

except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")




def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")


for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")

name = subtitle_array[0].text
dates = subtitle_array[-1].text

price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"

search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(url_list, location, data_pipeline=None, retries=3):
for url in url_list:
scrape_search_results(url, location, data_pipeline=data, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • We now create a new DataPipeline before starting our scrape.
  • We pass that DataPipeline into start_scrape() which in turn passes it into scrape_search_results().
  • From within our parsing function, we create a SearchData object and pass it into the pipeline.
  • Once the crawl has finished, we close the pipeline with crawl_pipeline.close_pipeline().

Step 4: Adding Concurrency

Here, we're going to add concurrency. We'll use ThreadPoolExecutor.

ThreadPoolExecutor opens up a new pool of threads up to max_threads. On each of these open threads, it calls a function and passes arguments to it. This approach is exponentially faster than a simple for loop.

Here is our new start_scrape().

def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)

If you look at executor.map(), you'll notice the following:

  • scrape_search_results is the function we want to call on available threads.
  • url_list is the list we want to run the function on.
  • All other arguments get passed in as arrays.

Here is our fully updated Python script.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links

except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")




def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")


for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")

name = subtitle_array[0].text
dates = subtitle_array[-1].text

price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"

search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Now that we've got concurrency, we just need to integrate with a proxy and we'll be ready for production.


Step 5: Bypassing Anti-Bots

We'll use a special function to avoid anti-bots. It needs to take in an API key, a URL, and some additional parameters and then returns all of these things combined into a ScrapeOps proxied url. We're going to call this one get_scrapeops_url().

You can view it below.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
  • "api_key" is our ScrapeOps API key.
  • "url" is the url we want to scrape.
  • "country" holds the country we want to be routed through.
  • "wait" tells ScrapeOps to wait a certain amount of time before sending back our result. This allows content to load on the page.

We add it into our parsing function and we're now ready to scrape!

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links

except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")




def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")


for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")

name = subtitle_array[0].text
dates = subtitle_array[-1].text

price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"

search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Here is our final main. Feel free to change MAX_THREADS, MAX_RETRIES, PAGES, LOCATION or keyword_list if you'd like to adjust your results. We're going to set PAGES to 4. That gives the max amount of pages from our pagination scraper.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 4
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Our crawl finished in 24.595 seconds.

Crawler Results Terminal

If you remember, we have wait set to 5 seconds, so we spent at least 5 seconds waiting for the pagination scrape. 24.595 - 5 = 19.595 seconds spent actually crawling. 19.595 seconds / 4 pages = 4.89 seconds per page.


Build An Airbnb Scraper

Now that we're successfully crawling, we're going to build our scraper.

  • Our scraper needs to read a CSV, then parse each individual listing from the CSV file.
  • After parsing a listing, it should store the extracted data in a new CSV.
  • It should do all of this concurrently for speed and efficiency.
  • This scraper should also integrate with a proxy to prevent from getting blocked.

Step 1: Create Simple Business Data Parser

Let's get started by creating our parsing function. We start by finding all the review cards using their CSS selector, soup.select("div[role='listitem']").

Once we have these cards, we iterate through them. On each card, we pull the name, stars, and review. These objects are the data we want to store for later review.

def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[role='listitem']")

for review_card in review_cards:
name = review_card.find("h3").text
stars = len(review_card.find_all("svg"))
spans = review_card.find_all("span")
review = spans[-1].text

review_data = {
"name": name,
"stars": stars,
"review": review
}

print(review_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
  • review_cards = soup.select("div[role='listitem']") finds our list of review cards.
  • In each of our reviews, we pull the following:
    • name
    • stars
    • reviews

Step 2: Loading URLs To Scrape

To use our parsing function, we need to feed it a url. Here, we're going to make a new function similar to start_scrape(). The main difference is that this one will first read the CSV file before calling the parsing function.

Here is process_results(). First, we open and read our CSV file into an array, reader. After we've got our array, we iterate through it and call process_listing().

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

You can view our full code up to this point below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links

except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")




def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")


for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")

name = subtitle_array[0].text
dates = subtitle_array[-1].text

price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"

search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)


def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[role='listitem']")

for review_card in review_cards:
name = review_card.find("h3").text
stars = len(review_card.find_all("svg"))
spans = review_card.find_all("span")
review = spans[-1].text

review_data = {
"name": name,
"stars": stars,
"review": review
}

print(review_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
  • We now read our CSV into an array.
  • After creating the array, we iterate through it and call process_listing() on each row from the CSV file.

Step 3: Storing the Scraped Data

At this point, storing our data is really simple. We already have our DataPipeline, we just need to feed it a new dataclass. This one will represent the review objects we've been parsing in the examples above. We'll call our new dataclass, ReviewData.

Here is our new ReviewData class.

@dataclass
class ReviewData:
name: str = ""
stars: int = 0
review: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

You can view our fully updated code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
stars: int = 0
review: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links

except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")




def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")


for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")

name = subtitle_array[0].text
dates = subtitle_array[-1].text

price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"

search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)


def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[role='listitem']")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

for review_card in review_cards:
name = review_card.find("h3").text
stars = len(review_card.find_all("svg"))
spans = review_card.find_all("span")
review = spans[-1].text

review_data = ReviewData(
name=name,
stars=stars,
review=review
)
review_pipeline.add_data(review_data)

review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_listing(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
  • We now open up a new DataPipeline from inside process_listing()
  • We pass our ReviewData objects into the pipeline.
  • Once we've finished the parsing operation, we close the pipeline and exit the function.

Step 4: Adding Concurrency

Now, we're going to add concurrency to our scraper. We're going to refactor process_results() exactly the same way we refactored start_scrape().

We'll use ThreadPoolExecutor to accomplish this again. Our first argument, process_listing is the function we want to call on our available threads. reader is our array of listings. All other arguments get passed in as arrays, just like before.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Step 5: Bypassing Anti-Bots

Bypassing anti-bots at this point is super easy. We only need to change one line and this will convert our url into a ScrapeOps proxied one.

response = requests.get(get_scrapeops_url(url, location=location))

This converts our url into a proxied one.

Our production ready code is available below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
stars: int = 0
review: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links

except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")




def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")


for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")

name = subtitle_array[0].text
dates = subtitle_array[-1].text

price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"

search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)


def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[role='listitem']")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '-')}.csv")

for review_card in review_cards:
name = review_card.find("h3").text
stars = len(review_card.find_all("svg"))
spans = review_card.find_all("span")
review = spans[-1].text

review_data = ReviewData(
name=name,
stars=stars,
review=review
)
review_pipeline.add_data(review_data)

review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 4
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Time for our production run. Just like before, feel free to change any of the following: MAX_RETRIES, MAX_THREADS, PAGES, LOCATION, and keyword_list. You can view our updated main below. Once again, we have pages set to 4.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 3
PAGES = 4
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")

page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Here is the final result.

Scraper Performance Terminal

If you remember before, it took 24.595 seconds to complete the intitial pagination scrape and the crawl. This time around, the crawl generated a CSV file with 63 results. 175.977 - 24.595 = 151.382 seconds scraping listings. 151.382 seconds / 63 listings = 2.4 seconds per listing.

This is very fast and efficient!


When you access Airbnb, you are subject to their terms of service. On top of that, when you access their site with a bot (in this case a scraper), you are subject to their robots.txt.

You can view Airbnb's terms here. Their robots.txt is available for review here.

Public data is typically legal to scrape. Private data (data gated behind a login) is a completely different story. When scraping private data, you are subject to not only the site's terms but also the privacy laws that govern that site.

Always consult an attorney when you have questions about the legality of your scraper.

Conclusion

You've scraped Airbnb. Airbnb represented some interesting challenges not typical to a regular scraping job. You should have a decent command of both Python Requests and BeautifulSoup. Parsing, pagination, data storage, concurrency, and proxy integration should also be terms you now understand.

If you'd like to know more about the tech stack used in this article, take a look at the links below.


More Python Web Scraping Guides

At ScrapeOps, we have all sorts of learning material for both new and experienced developers.

Check out our Python Web Scraping Playbook and level up your skillset today! If you want to read more of our "How To Scrape" series, check out the articles listed below.