How to Scrape Indeed With Requests and BeautifulSoup
If you're looking for a job, Indeed is a great place to start. Since its founding in 2004, Indeed has held job postings from all over the world. With this widespread adoption, we get a very widespread dataset to work with. Indeed holds so many job listings that it's virtually impossible to get through them all. This is where scraping comes in very handy.
Today, we'll learn how to build an Indeed scraper that can scrape Indeed and produce reports on these jobs.
- TLDR: How to Scrape Indeed
- How To Architect Our Scraper
- Understanding How To Scrape Indeed
- Setting Up Our Indeed Scraper
- Build An Indeed Search Crawler
- Build An Indeed Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
TLDR - How to Scrape Indeed
If you need to scrape Indeed but you don't have time for a tutorial, use the scraper below. Simply create a new project folder and add this file, and a config.json
file with your API key. Once that's done it's as simple as python name_of_your_script.py
.
You'll get a crawler file from the crawler and an individual report on each job that was found in the crawler.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class JobData:
name: str = ""
salary: str = ""
description: str = ""
benefits: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.select_one("h2").text
parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.select_one("span[data-testid='company-name']").text
rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text
location = div_card.select_one("div[data-testid='text-location']").text
search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_job(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
job_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
soup = BeautifulSoup(response.text, "html.parser")
salary = "n/a"
salary_holder = soup.select_one("div[id='salaryInfoAndJobContainer']")
if salary_holder:
salary = salary_holder.text
description = "n/a"
description_holder = soup.select_one("div[id='jobDescriptionText']")
if description_holder:
description = description_holder.text
benefits = "n/a"
benefits_holder = soup.select_one("div[id='benefits']")
if benefits_holder:
benefits = benefits_holder.text
job_data = JobData(
name=row["name"],
salary=salary,
description=description,
benefits=benefits
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_job,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
When you run this scraper, feel free to change any of the following constants:
MAX_RETRIES
: Defines the maximum number of retries the scraper will attempt if a request fails (e.g., due to a network error or a non-200 HTTP response).MAX_THREADS
: Defines the maximum number of threads that will be used to run the scraper concurrently.PAGES
: Determines the number of search result pages to scrape for each keyword.LOCATION
: Sets the country or region code used in the scraping process.LOCALITY
: Specifies the locality or city used in the search query, which can narrow down the search results to a specific geographical area.keyword_list
: A list of keywords that the scraper will use to search for job postings in Indeed.
How To Architect Our Indeed Scraper
To create our Indeed scraper project, we'll first need to scrape individual job listings from a keyword search. This portion of our scrape is called a crawl. Our crawler will need to perform the following tasks.
- Perform a search and parse the results.
- Paginate results for better control over our data.
- Store important information to a CSV file.
- Steps 1 through 3 will need to be performed concurrently to maximize speed and efficiency.
- Proxy Integration will keep us from getting blocked.
After our crawler is working, it will be generating reports for different jobs. Next, we'll need to get detailed information about each of those jobs. This will be the part where we build our actual scraper. The scraper's job goes as follows:
- Read the crawler's report.
- Parse the individual job results.
- Store these new results in a separate report.
- Concurrently run steps 2 and 3 until the job is finished.
- Integrate with a proxy to avoid anti-bots and anything else that may get in our way.
Understanding How To Scrape Indeed
Before we build this project, we need a better understanding of the data we need to get from Indeed.
Step 1: How To Request Indeed Pages
As with any website, we need to perform a GET request. Go ahead and take a look at our domain:
https://www.indeed.com/jobs?q=writer&l=Westland%2C+MI&start=10&vjk=a88c42edb7b19c5d
Let's break this down a little bit.
https://www.indeed.com/jobs
is the actual endpoint we're hitting on the server.?q=writer&l=Westland%2C+MI&start=10&vjk=a88c42edb7b19c5d
is our string of queries.
Queries provide additional information to our server for fine tuned results. q=writer
tells Indeed that we want to search for writer jobs.
If we want to search for writer jobs without any other criteria, our URL would be:
https://www.indeed.com/jobs?q=writer
You can view Indeed's search page below.
Individual job pages look like this.
Step 2: How To Extract Data From Indeed Results and Pages
When we pull data from Indeed search results, we first need to realize that each result gets embedded within its own div
card. If we can find this card, we can find all the information it holds. Each of these cards has a data-testid
of slider_item
.
On our individual job page, we can find the job description. Just like the example above, our data gets embedded within a div
card. This div
has an id
of "jobDescriptionText"
. This card holds our entire job description.
Step 3: How To Control Pagination
Remember our URL from earlier? Here it is again:
https://www.indeed.com/jobs?q=writer&l=Westland%2C+MI&start=10&vjk=a88c42edb7b19c5d
Take a look at the following query: start=10
On Indeed, each page number is a multiple of 10.
- Page 1 is
start=0
. - Page 2 is
start=10
. - Page 3 is
start=20
.
To paginate our results, we can change the start
parameter.
Step 4: Geolocated Data
To handle geolocation we'll actually need to do two things.
For starters, there is another parameter we need to look at: l=Westland%2C+MI
. l
is the location we'd like to search. If we wanted to search in London, we could pass l=London%2CUK
.
Our other geolocation element actually doesn't involve Indeed at all, but rather ScrapeOps. When we talk to the ScrapeOps API, we can pass in a country
param.
- If we tell ScrapeOps
"country": "us"
, we'll get routed through a server in the US. - If we want to appear in the UK, we could pass
"country": "uk"
.
Setting Up Our Indeed Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir indeed-scraper
cd indeed-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build An Indeed Search Crawler
Time to start building our crawler. We'll add the following pieces in step-by-step.
- Create a parser
- Add pagination
- Store the parsed data
- Add concurrency
- Add proxy integration
Step 1: Create Simple Search Data Parser
We're going to start with a basic data parser. The goal of our parsing function will be simple. It needs to perform a search, and then extract data from the results.
The code below sets up our basic structure with error handling, retry logic, and of course adds our basic parsing function.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, locality, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.select_one("h2").text
parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.select_one("span[data-testid='company-name']").text
rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text
location = div_card.select_one("div[data-testid='text-location']").text
search_data = {
"name": name,
"url": url,
"stars": rating,
"company_name": company_name,
"location": location
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, LOCATION, LOCALITY, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
scrape_search_results()
does the following:
- Make a request to the server
- If we don't get a
status_code
of 200, throw anException
- Find our result cards with
div_cards = soup.select("div[data-testid='slider_item']")
- With each
div_card
, we:- Find the name:
div_card.select_one("h2").text
- Parse the url for the
job_key
- Get the
company_name
- Check for the
rating
and if it's present, save it to therating
variable - Get the
location
:location = div_card.select_one("div[data-testid='text-location']").text
- Find the name:
We now have not only a basic parsing function, but also a set structure for the rest of our code.
Step 2: Add Pagination
Next, we need to add pagination. This is quite simple. We need to slightly change our URL and we also need to create a function that calls scrape_search_results()
on multiple pages.
Our URL now looks like this.
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
Now, we'll add a start_scrape()
function. It's extremely simple, it just iterates through the pages and calls scrape_search_results()
on each of them.
def start_scrape(keyword, pages, location, locality, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page, retries=retries)
Here is our full script up to this point.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, locality, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.select_one("h2").text
parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.select_one("span[data-testid='company-name']").text
rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text
location = div_card.select_one("div[data-testid='text-location']").text
search_data = {
"name": name,
"url": url,
"stars": rating,
"company_name": company_name,
"location": location
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
With pagination, we now have the ability to control our search results. We paginated our url and we've added a function to parse a list of pages. We're now properly fetching and extracting our data. In the next section, we'll add storage for this data.
Step 3: Storing the Scraped Data
To store our data, we're going to write two classes.
- Our first one is a
dataclass
calledSearchData
.- The
SearchData
class is to simply hold data and represent adiv_card
object from our parsing function.
- The
- The second class is a
DataPipeline
.- This opens a pipeline to a CSV file and puts data through the pipeline. It also uses the
name
field to filter out duplicates.
- This opens a pipeline to a CSV file and puts data through the pipeline. It also uses the
Here is our SearchData
class.
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is our DataPipeline
.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
Once we've put it all together, our script looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.select_one("h2").text
parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.select_one("span[data-testid='company-name']").text
rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text
location = div_card.select_one("div[data-testid='text-location']").text
search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Key things to take from this section:
SearchData
represents an object from our search results.DataPipeline
pipes ourSearchData
to a CSV.
Step 4: Adding Concurrency
To maximize our speed and efficiency, we need to add concurrency. This will be relatively easy. We'll just refactor start_scrape()
. We'll remove the for
loop and then we'll replace it with ThreadPoolExecutor
.
Here is the finished function.
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
Look at the arguments to executor.map()
:
scrape_search_results
is the function we want to call on each thread.- All other arguments are args that get passed into
scrape_search_results
. We pass them in as arrays, these arrays then get passed intoscrape_search_results
.
Step 5: Bypassing Anti-Bots
Our scraper is almost ready. First, it needs the ability to get past roadblocks. Anti-bots are designed to detect and block malicious bots from accessing a site.
For us to get past these anti-bots (and anything else for that matter), we'll be using the ScrapeOps Proxy API. The function below takes any regular old URL and converts it into a ScrapeOps Proxied URL.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
Take a look at our payload
that gets passed into ScrapeOps:
"api_key"
: you ScrapeOps API key."url"
: the url you'd like to scrape."country"
: the country we want to be routed through."residential"
: a boolean. If we set it toTrue
, ScrapeOps gives us a residential IP address instead of a datacenter IP.
Here is our full code ready for production.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.select_one("h2").text
parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.select_one("span[data-testid='company-name']").text
rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text
location = div_card.select_one("div[data-testid='text-location']").text
search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Time to run in production and get a feel for our performance. We'll set PAGES
to 3.
Here is our updated main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Feel free to tweak your results and change any of the following constants:
MAX_RETRIES
MAX_THREADS
PAGES
LOCATION
LOCALITY
Here are our results for 3 pages.
We scraped 3 pages in ~ 56.6 seconds. This comes out to roughly 18.86 seconds per result page. Depending on your LOCATION
, your hardware, and the speed of your internet connection, results will vary.
Build An Indeed Scraper
Our crawler is now spitting out CSV files. Now, we need to build a scraper that does each of these tasks:
- Read the CSV file.
- Parse the jobs from the CSV file.
- Store the parsed data from each job.
- Parse these pages concurrently.
- Integrate with the ScrapeOps Proxy API.
Step 1: Create Simple Job Data Parser
As usual, we'll get started by writing a basic parsing function. Like before, it holds error handling and retry logic and it also sets the stage for all the future code that we're going to add.
Here is process_job()
.
def process_job(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
salary = "n/a"
salary_holder = soup.select_one("div[id='salaryInfoAndJobContainer']")
if salary_holder:
salary = salary_holder.text
description = "n/a"
description_holder = soup.select_one("div[id='jobDescriptionText']")
if description_holder:
description = description_holder.text
benefits = "n/a"
benefits_holder = soup.select_one("div[id='benefits']")
if benefits_holder:
benefits = benefits_holder.text
job_data = {
"name": row["name"],
"salary": salary,
"description": description,
"benefits": benefits
}
print(job_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
while
we have retries and the operation has not succeeded:
- We check and see if the salary is present:
soup.select_one("div[id='salaryInfoAndJobContainer']")
and if it is, we pull the salary. - Check for a description the same way:
soup.select_one("div[id='jobDescriptionText']")
. - Check for the presence of benefits the same way as well:
soup.select_one("div[id='benefits']")
.
Step 2: Loading URLs To Scrape
In order to use our parsing function, we need to be able to read a CSV file. We're going to create another function, process_results()
. This one starts off pretty similar to start_scrape()
.
Here is process_results()
:
- We read the CSV file into an array.
- We then iterate through the array and call
process_job()
on each of the rows from the array.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_job(row, location, retries=retries)
Here is our full code up to this point.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.select_one("h2").text
parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.select_one("span[data-testid='company-name']").text
rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text
location = div_card.select_one("div[data-testid='text-location']").text
search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_job(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
salary = "n/a"
salary_holder = soup.select_one("div[id='salaryInfoAndJobContainer']")
if salary_holder:
salary = salary_holder.text
description = "n/a"
description_holder = soup.select_one("div[id='jobDescriptionText']")
if description_holder:
description = description_holder.text
benefits = "n/a"
benefits_holder = soup.select_one("div[id='benefits']")
if benefits_holder:
benefits = benefits_holder.text
job_data = {
"name": row["name"],
"salary": salary,
"description": description,
"benefits": benefits
}
print(job_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_job(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
We now have a basic structure that we can use to finish building our scraper. process_job()
looks up an individual job posting and parses its information. process_results()
calls process_job()
on every single job saved in our CSV file from the crawl.
Step 3: Storing the Scraped Data
To store our data, we need to create another dataclass
. This one holds information from an individual job page. Take a look at JobData
.
@dataclass
class JobData:
name: str = ""
salary: str = ""
description: str = ""
benefits: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
This class is basically the same as SearchData
. It just holds fewer fields. Now we need to add a DataPipeline
into our parsing function in order to save this information to a file.
Here is our full code up to this point.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class JobData:
name: str = ""
salary: str = ""
description: str = ""
benefits: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.select_one("h2").text
parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.select_one("span[data-testid='company-name']").text
rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text
location = div_card.select_one("div[data-testid='text-location']").text
search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_job(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
job_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
soup = BeautifulSoup(response.text, "html.parser")
salary = "n/a"
salary_holder = soup.select_one("div[id='salaryInfoAndJobContainer']")
if salary_holder:
salary = salary_holder.text
description = "n/a"
description_holder = soup.select_one("div[id='jobDescriptionText']")
if description_holder:
description = description_holder.text
benefits = "n/a"
benefits_holder = soup.select_one("div[id='benefits']")
if benefits_holder:
benefits = benefits_holder.text
job_data = JobData(
name=row["name"],
salary=salary,
description=description,
benefits=benefits
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_job(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
We're now fetching and storing the proper data. In the coming sections, it will be time to optimize our scraper.
Step 4: Adding Concurrency
When we added concurrency earlier, we simply refactored a for
loop and replaced it with ThreadPoolExecutor
. We'll be doing exactly this again with process_results()
.
Here is our finalized process_results()
function.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_job,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Just like before, our arguments to executor.map()
are as follows:
process_job
is the function we want to call on every available thread.- All other arguments get passed in as arrays.
Step 5: Bypassing Anti-Bots
Time to bypass anti-bots again. We already have our get_scrapeops_url()
function, we just need to add it into a single line and unlock the power of proxy.
response = requests.get(get_scrapeops_url(url, location=location))
Take a look below. This is what our code looks like now that it's ready to run in production.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class JobData:
name: str = ""
salary: str = ""
description: str = ""
benefits: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.select_one("h2").text
parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.select_one("span[data-testid='company-name']").text
rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text
location = div_card.select_one("div[data-testid='text-location']").text
search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_job(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
job_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
soup = BeautifulSoup(response.text, "html.parser")
salary = "n/a"
salary_holder = soup.select_one("div[id='salaryInfoAndJobContainer']")
if salary_holder:
salary = salary_holder.text
description = "n/a"
description_holder = soup.select_one("div[id='jobDescriptionText']")
if description_holder:
description = description_holder.text
benefits = "n/a"
benefits_holder = soup.select_one("div[id='benefits']")
if benefits_holder:
benefits = benefits_holder.text
job_data = JobData(
name=row["name"],
salary=salary,
description=description,
benefits=benefits
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_job,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Let's test this all out in production. Here is our updated main
. Since we know that we can crawl pages at approximately 18.6 seconds oer page, we'll crawl just one page this time.
Once again, feel free to change constants in order to tweak your results.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
We were getting server errors with 5 threads so we switched to 2. This is not uncommon when there are overwhelming requests hitting a server. With 2 threads, we completed the crawl and the scrape in 194.38 seconds.
If you remember earlier our crawl finished with 18.86 seconds per page. 194.38 - 18.86 = 175.52 seconds. In total, we scraped 15 separate job postings.
175.52 seconds / 15 jobs = 11.7 seconds per job.
Legal and Ethical Considerations
Whenever you scrape a website, you are subject to both their Terms of Service and their robots.txt
.
It's important to note that most sites can suspend or even permanently ban you for violating their terms.
On another note, when scraping the web, public data is generally fair game. If you don't have to login to a site to view the data, this is public data.
If your data is gated behind a login, this is generally considered private data. When working with private data, you often need to get permission from the site you're scraping and you can be sued for accessing or disseminating private data.
If you're unsure whether your scraper is legal, consult an attorney.
Conclusion
You've finished the tutorial! You now know how to use Requests and BeautifulSoup. You should have a somewhat decent grasp of CSS selectors and you should have a solid understanding of parsing, pagination, data storage, concurrency, and proxy integration.
If you'd like to know more about the tech stack used in this article, take a look at the links below.
More Python Web Scraping Guides
Here at ScrapeOps, we've got a ton of learning material. Whether you're building your first ever scraper, or you've been scraping for years, we've got something for you.
Check out our Python Web Scraping Playbook! If you're interested in more of our "How To Scrape" series, check out the articles below!