How to Scrape Airbnb Requests and BeautifulSoup
Airbnb has been operating since 2008. Once it blew up, Airbnb completely upended both the hotel and rental industries. On Airbnb, you can search for short-stay rental properties in place of a hotel. This gives Airbnb a very unique dataset for us to work with.
Today, we'll create a scraper project that extracts Airbnb listings and their reviews.
- TLDR: How to Scrape Airbnb
- How To Architect Our Scraper
- Understanding How To Scrape Airbnb
- Setting Up Our Airbnb Scraper
- Build An Airbnb Search Crawler
- Build An Airbnb Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
TLDR - How to Scrape Airbnb
If you need a scraper but you don't want to read, look no further. In the section below, we have a pre-built scraper for you to use.
- First, make a new project folder and add a
config.json
file with your ScrapeOps API keys. - Then make a new Python file and paste the following code into it.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
stars: int = 0
review: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")
def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")
for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")
name = subtitle_array[0].text
dates = subtitle_array[-1].text
price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"
search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)
def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[role='listitem']")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-').replace('/', '-')}.csv")
for review_card in review_cards:
name = review_card.find("h3").text
stars = len(review_card.find_all("svg"))
spans = review_card.find_all("span")
review = spans[-1].text
review_data = ReviewData(
name=name,
stars=stars,
review=review
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_listing,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 4
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
If you'd like to customize your results, go ahead and change any of the following constants from main
:
MAX_RETRIES
: Sets the maximum number of retry attempts the script will make if a request fails.MAX_THREADS
: Sets the maximum number of threads (or concurrent tasks) that the script will use when scraping data.PAGES
: Determines how many pages of search results the scraper will attempt to process.LOCATION
: Specifies the country code for the location from which you want to simulate the scraping requests.keyword_list
: A list of keywords or phrases that the script will use to search for listings on the website.
How To Architect Our Airbnb Scraper
This project will actually consist of three separate scrapers. Our two main scrapers are the results crawler and the listing scraper.
Our result crawler will perform a search and store the results. Our listing scraper is going to read the report from our crawler, and then scrape reviews for each individual listing.
The process for our building our crawler goes as follows:
- Write a parsing function to scrape Airbnb listings.
- Add pagination so we can scrape multiple pages of results. This step requires a mini-scraper. The mini-scraper is going to perform a search and extract the links to other pages.
- We'll use data storage to save the data from each listing.
- Concurrency will give us the ability to scrape multiple pages at once.
- Proxy integration will be used to bypass anti-bots.
We'll build our listing scraper by adding the following.
- Write a parsing to extract review data.
- Add the ability to read urls from our CSV file.
- Store the data from each review to a CSV.
- Concurrently scrape these review pages.
- Integrate with a proxy to once again get past anti-bots.
Understanding How To Scrape Airbnb
Now, we need to get a look at our data from a high level. In these coming sections, we need to look at Airbnb pages and see how they're built. We need to find how their urls are constructed and we need to find where our data is being kept on each page.
Step 1: How To Request Airbnb Pages
We'll use a simple GET request to find our Airbnb search pages. Our review pages will be extracted from our initial search. Each card in the search results contains its own link to the individual listing page and therefore the reviews.
Our result pages start with a URL that looks like this:
https://www.airbnb.com/s/Myrtle-Beach--South-Carolina--United-States/homes
The format would be:
https://www.airbnb.com/s/{NAME-OF-SEARCH-LOCATION}/homes
You can see this in the image below.
Here is an individual listing page. These are the pages we'll look up using our CSV report. If you look at the URL, it contains a series of hashes which are impossible to reproduce:
https://www.airbnb.com/rooms/34653621?adults=1&children=0&enable_m3_private_room=true&infants=0&pets=0&search_mode=regular_search&check_in=2024-09-02&check_out=2024-09-07&source_impression_id=p3_1723223538_P3jJDPiXFbNNUsdP&previous_page_section_name=1000&federated_search_id=532193a1-1995-4edd-824a-5987dfa778f1
Lucky for us, we'll be scraping these URLs during our crawl.
Step 2: How To Extract Data From Airbnb Results and Pages
Now that we know how to GET these pages, we need to understand where their data is located.
- On the results page all of our data is located inside
div
cards with adata-testid
ofcard-container
. - We can find them using their CSS selector,
"div[data-testid='card-container']"
. - From within these cards, we can find all the other information we need to pull.
You can see its location in the HTML below.
Extracting our reviews is a really similar process. This time we'll be extracting div
elements with the role
of listitem
.
Here is the CSS selector we would use: "div[role='listitem']"
.
Go ahead and look at it in the image below. From this div
, we'll be able to pull all of our relevant review data.
Step 3: How To Control Pagination
Pagination with Airbnb is going to be handled very differently from some of our other scrapers in this series. Just like with our listing pages, our page URLs actually contain a series of hashes that we can't reproduce.
Here is an example URL:
https://www.airbnb.com/s/Myrtle-Beach--South-Carolina--United-States/homes?tab_id=home_tab&refinement_paths%5B%5D=%2Fhomes&query=Myrtle%20Beach%2C%20South%20Carolina%2C%20United%20States&place_id=ChIJASFVO5VoAIkRGJbQtRWxD7w&flexible_trip_lengths%5B%5D=one_week&monthly_start_date=2024-09-01&monthly_length=3&monthly_end_date=2024-12-01&search_mode=regular_search&price_filter_input_type=0&channel=EXPLORE&federated_search_session_id=dcc6f5af-f1c5-4463-8c02-7e4dcf38a02d&search_type=unknown&pagination_search=true&cursor=eyJzZWN0aW9uX29mZnNldCI6MCwiaXRlbXNfb2Zmc2V0IjoxOCwidmVyc2lvbiI6MX0%3D
For us to get these URLs, we're actually going to need to scrape them beforehand.
To scrape them, we're actually going to GET the first page an extra time before starting the scrape and pull urls from the page buttons like the one you can see in the image below.
Step 4: Geolocated Data
To handle geolocated data, we'll be using the ScrapeOps Proxy Aggregator API with the country
parameter.
When we pass a country
into ScrapeOps, they will route us through a server in that country.
"country": "us"
tells ScrapeOps that we want to appear in the US.- If we want to appear in the UK, we would pass
"country": "uk"
. This gives us an actual IP address from within the country of our choosing.
Setting Up Our Airbnb Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir airbnb-scraper
cd airbnb-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build An Airbnb Search Crawler
It's finally time to start coding. We'll get started with our crawler.
- First, we're going to write a parser.
- Next, we'll write a pagination scraper.
- Afterwards, we'll add data storage, concurrency and proxy integration.
Sound like a lot?
No worries, in the coming sections, we'll go through all of this step by step.
Step 1: Create Simple Search Data Parser
We'll start by building a parser.
- We'll add our imports, some error handling, retry logic and other parts of our basic structure.
- After finding all our data, we simply print it to the terminal.
- At the moment, we can only parse the first page of the search. Later on we'll change this to accomodate pagination and data storage.
- Pay close attention to our parsing function. This is where the actual scraping is taking place.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(url, location, retries=3):
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")
for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")
name = subtitle_array[0].text
dates = subtitle_array[-1].text
price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"
search_data = {
"name": name,
"description": descripition,
"dates": dates,
"price": price,
"url": link
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
scrape_search_results(url, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
To parse our data:
div_card.select_one("div[data-testid='listing-card-title']").text
gets our title.- We pull our array of subtitles with
div_card.select("div[data-testid='listing-card-subtitle']")
. div_card.find("a").get("href")
finds the link to the listing page.- We then fix the url with
link = f"https://www.airbnb.com{href}"
Step 2: Add Pagination
Now, we need to add pagination. This is going to be quite a bit different than when we add pagination with most other sites. Since we can't reproduce the pagination in an Airbnb URL, we need to scrape the paginated links.
The function below finds all the pagination links using their CSS selector, "nav[aria-label='Search results pagination']"
.
Here is find_pagination_urls()
.
def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")
soup.select_one("nav[aria-label='Search results pagination']")
finds our bar of pagination links.- We then find all the links on the bar with
pagination_bar.find_all("a")
. - Since we're on the first page, we add our current url to the
links
array. - Our visible buttons on the page only go to page 4, so we make a string array for comparing the button links,
["1", "2", "3", "4"]
. - If a link button holds any of the text in the array above, we add it to our list.
- Once we've got our list, we
return
it. We'll pass this array into ourstart_scrape()
function.
Now, we'll make a start_scrape()
function to take in a list of urls and call scrape_search_results()
. It's very simple. It just takes in a url_list
and uses a for
loop to call scrape_search_results()
on each url.
def start_scrape(url_list, location, retries=3):
for url in url_list:
scrape_search_results(url, location, retries=retries)
After we've put it all together, our code looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")
def scrape_search_results(url, location, retries=3):
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")
for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")
name = subtitle_array[0].text
dates = subtitle_array[-1].text
price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"
search_data = {
"name": name,
"description": descripition,
"dates": dates,
"price": price,
"url": link
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(url_list, location, retries=3):
for url in url_list:
scrape_search_results(url, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)
start_scrape(page_urls, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
- First, we scrape our pagination urls.
- Then, we call
start_scrape()
to runscrape_search_results()
on each and every url generated from the list.
Step 3: Storing the Scraped Data
To store our scraped data, we'll need to add a dataclass
and a DataPipeline
. We'll call our dataclass
SearchData
. This SearchData
gets passed into the DataPipeline
which pipes our data to a CSV file and removes duplicate results.
Here is our SearchData
.
@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
You can view our DataPipeline
below.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
With these added in, here is our fully updated code.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")
def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")
for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")
name = subtitle_array[0].text
dates = subtitle_array[-1].text
price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"
search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(url_list, location, data_pipeline=None, retries=3):
for url in url_list:
scrape_search_results(url, location, data_pipeline=data, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- We now create a new
DataPipeline
before starting our scrape. - We pass that
DataPipeline
intostart_scrape()
which in turn passes it intoscrape_search_results()
. - From within our parsing function, we create a
SearchData
object and pass it into the pipeline. - Once the crawl has finished, we close the pipeline with
crawl_pipeline.close_pipeline()
.
Step 4: Adding Concurrency
Here, we're going to add concurrency. We'll use ThreadPoolExecutor
.
ThreadPoolExecutor
opens up a new pool of threads up to max_threads
. On each of these open threads, it calls a function and passes arguments to it. This approach is exponentially faster than a simple for
loop.
Here is our new start_scrape()
.
def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)
If you look at executor.map()
, you'll notice the following:
scrape_search_results
is the function we want to call on available threads.url_list
is the list we want to run the function on.- All other arguments get passed in as arrays.
Here is our fully updated Python script.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")
def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")
for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")
name = subtitle_array[0].text
dates = subtitle_array[-1].text
price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"
search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Now that we've got concurrency, we just need to integrate with a proxy and we'll be ready for production.
Step 5: Bypassing Anti-Bots
We'll use a special function to avoid anti-bots. It needs to take in an API key, a URL, and some additional parameters and then returns all of these things combined into a ScrapeOps proxied url. We're going to call this one get_scrapeops_url()
.
You can view it below.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
"api_key"
is our ScrapeOps API key."url"
is the url we want to scrape."country"
holds the country we want to be routed through."wait"
tells ScrapeOps to wait a certain amount of time before sending back our result. This allows content to load on the page.
We add it into our parsing function and we're now ready to scrape!
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")
def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")
for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")
name = subtitle_array[0].text
dates = subtitle_array[-1].text
price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"
search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Here is our final main
. Feel free to change MAX_THREADS
, MAX_RETRIES
, PAGES
, LOCATION
or keyword_list
if you'd like to adjust your results. We're going to set PAGES
to 4. That gives the max amount of pages from our pagination scraper.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 4
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Our crawl finished in 24.595 seconds.
If you remember, we have wait
set to 5 seconds, so we spent at least 5 seconds waiting for the pagination scrape. 24.595 - 5 = 19.595 seconds spent actually crawling. 19.595 seconds / 4 pages = 4.89 seconds per page.
Build An Airbnb Scraper
Now that we're successfully crawling, we're going to build our scraper.
- Our scraper needs to read a CSV, then parse each individual listing from the CSV file.
- After parsing a listing, it should store the extracted data in a new CSV.
- It should do all of this concurrently for speed and efficiency.
- This scraper should also integrate with a proxy to prevent from getting blocked.
Step 1: Create Simple Business Data Parser
Let's get started by creating our parsing function. We start by finding all the review cards using their CSS selector, soup.select("div[role='listitem']")
.
Once we have these cards, we iterate through them. On each card, we pull the name
, stars
, and review
. These objects are the data we want to store for later review.
def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[role='listitem']")
for review_card in review_cards:
name = review_card.find("h3").text
stars = len(review_card.find_all("svg"))
spans = review_card.find_all("span")
review = spans[-1].text
review_data = {
"name": name,
"stars": stars,
"review": review
}
print(review_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
review_cards = soup.select("div[role='listitem']")
finds our list of review cards.- In each of our reviews, we pull the following:
name
stars
reviews
Step 2: Loading URLs To Scrape
To use our parsing function, we need to feed it a url. Here, we're going to make a new function similar to start_scrape()
. The main difference is that this one will first read the CSV file before calling the parsing function.
Here is process_results()
. First, we open and read our CSV file into an array, reader
. After we've got our array, we iterate through it and call process_listing()
.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_listing(row, location, retries=retries)
You can view our full code up to this point below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")
def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")
for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")
name = subtitle_array[0].text
dates = subtitle_array[-1].text
price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"
search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)
def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[role='listitem']")
for review_card in review_cards:
name = review_card.find("h3").text
stars = len(review_card.find_all("svg"))
spans = review_card.find_all("span")
review = spans[-1].text
review_data = {
"name": name,
"stars": stars,
"review": review
}
print(review_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_listing(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(page_urls, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
- We now read our CSV into an array.
- After creating the array, we iterate through it and call
process_listing()
on each row from the CSV file.
Step 3: Storing the Scraped Data
At this point, storing our data is really simple. We already have our DataPipeline
, we just need to feed it a new dataclass
. This one will represent the review objects we've been parsing in the examples above. We'll call our new dataclass
, ReviewData
.
Here is our new ReviewData
class.
@dataclass
class ReviewData:
name: str = ""
stars: int = 0
review: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
You can view our fully updated code below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
description: str = ""
dates: str = ""
price: str = ""
url: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
stars: int = 0
review: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def find_pagination_urls(keyword, location, pages=4, retries=3):
formatted_keyword = keyword.replace(", ", "--").replace(" ", "-")
url = f"https://www.airbnb.com/s/{formatted_keyword}/homes"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
pagination_bar = soup.select_one("nav[aria-label='Search results pagination']")
a_tags = pagination_bar.find_all("a")
links = []
links.append(url)
acceptable_pages = ["1", "2", "3", "4"]
for a in a_tags:
if a.text in acceptable_pages and len(links) < pages:
href = a.get("href")
link = f"https://www.airbnb.com{href}"
links.append(link)
success = True
return links
except Exception as e:
logger.warning(f"Failed to fetch page list for {url} tries left {retries - tries}")
logger.warning(f"Exception: {e}")
tries += 1
if not success:
raise Exception("Failed to find pagination, max retries exceeded!")
def scrape_search_results(url, location, data_pipeline=None, retries=3):
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='card-container']")
for div_card in div_cards:
descripition = div_card.select_one("div[data-testid='listing-card-title']").text
subtitle_array = div_card.select("div[data-testid='listing-card-subtitle']")
name = subtitle_array[0].text
dates = subtitle_array[-1].text
price = div_card.select_one("span div span").text
href = div_card.find("a").get("href")
link = f"https://www.airbnb.com{href}"
search_data = SearchData(
name=name,
description=descripition,
dates=dates,
price=price,
url=link
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries +=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(url_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
url_list,
[location] * len(url_list),
[data_pipeline] * len(url_list),
[retries] * len(url_list)
)
def process_listing(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_cards = soup.select("div[role='listitem']")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review_card in review_cards:
name = review_card.find("h3").text
stars = len(review_card.find_all("svg"))
spans = review_card.find_all("span")
review = spans[-1].text
review_data = ReviewData(
name=name,
stars=stars,
review=review
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_listing(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["Myrtle Beach, South Carolina, United States"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(", ", "-").replace(" ", "-")
page_urls = find_pagination_urls(keyword, LOCATION, pages=PAGES, retries=MAX_RETRIES)
crawl_pipeline = DataPipeline(csv_filename