Skip to main content

Scrape Indeed With Python Requests and BeautifulSoup

How to Scrape Indeed With Requests and BeautifulSoup

If you're looking for a job, Indeed is a great place to start. Since its founding in 2004, Indeed has held job postings from all over the world. With this widespread adoption, we get a very widespread dataset to work with. Indeed holds so many job listings that it's virtually impossible to get through them all. This is where scraping comes in very handy.

Today, we'll learn how to build an Indeed scraper that can scrape Indeed and produce reports on these jobs.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape Indeed

If you need to scrape Indeed but you don't have time for a tutorial, use the scraper below. Simply create a new project folder and add this file, and a config.json file with your API key. Once that's done it's as simple as python name_of_your_script.py.

You'll get a crawler file from the crawler and an individual report on each job that was found in the crawler.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class JobData:
name: str = ""
salary: str = ""
description: str = ""
benefits: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.select("div[data-testid='slider_item']")


for div_card in div_cards:

name = div_card.select_one("h2").text

parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"

company_name = div_card.select_one("span[data-testid='company-name']").text

rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text

location = div_card.select_one("div[data-testid='text-location']").text


search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_job(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
job_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
soup = BeautifulSoup(response.text, "html.parser")

salary = "n/a"
salary_holder = soup.select_one("div[id='salaryInfoAndJobContainer']")
if salary_holder:
salary = salary_holder.text
description = "n/a"
description_holder = soup.select_one("div[id='jobDescriptionText']")
if description_holder:
description = description_holder.text
benefits = "n/a"
benefits_holder = soup.select_one("div[id='benefits']")
if benefits_holder:
benefits = benefits_holder.text

job_data = JobData(
name=row["name"],
salary=salary,
description=description,
benefits=benefits
)

job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_job,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

When you run this scraper, feel free to change any of the following constants:

  • MAX_RETRIES: Defines the maximum number of retries the scraper will attempt if a request fails (e.g., due to a network error or a non-200 HTTP response).
  • MAX_THREADS: Defines the maximum number of threads that will be used to run the scraper concurrently.
  • PAGES: Determines the number of search result pages to scrape for each keyword.
  • LOCATION: Sets the country or region code used in the scraping process.
  • LOCALITY: Specifies the locality or city used in the search query, which can narrow down the search results to a specific geographical area.
  • keyword_list: A list of keywords that the scraper will use to search for job postings in Indeed.

How To Architect Our Indeed Scraper

To create our Indeed scraper project, we'll first need to scrape individual job listings from a keyword search. This portion of our scrape is called a crawl. Our crawler will need to perform the following tasks.

  1. Perform a search and parse the results.
  2. Paginate results for better control over our data.
  3. Store important information to a CSV file.
  4. Steps 1 through 3 will need to be performed concurrently to maximize speed and efficiency.
  5. Proxy Integration will keep us from getting blocked.

After our crawler is working, it will be generating reports for different jobs. Next, we'll need to get detailed information about each of those jobs. This will be the part where we build our actual scraper. The scraper's job goes as follows:

  1. Read the crawler's report.
  2. Parse the individual job results.
  3. Store these new results in a separate report.
  4. Concurrently run steps 2 and 3 until the job is finished.
  5. Integrate with a proxy to avoid anti-bots and anything else that may get in our way.

Understanding How To Scrape Indeed

Before we build this project, we need a better understanding of the data we need to get from Indeed.

Step 1: How To Request Indeed Pages

As with any website, we need to perform a GET request. Go ahead and take a look at our domain:

https://www.indeed.com/jobs?q=writer&l=Westland%2C+MI&start=10&vjk=a88c42edb7b19c5d

Let's break this down a little bit.

  • https://www.indeed.com/jobs is the actual endpoint we're hitting on the server.
  • ?q=writer&l=Westland%2C+MI&start=10&vjk=a88c42edb7b19c5d is our string of queries.

Queries provide additional information to our server for fine tuned results. q=writer tells Indeed that we want to search for writer jobs.

If we want to search for writer jobs without any other criteria, our URL would be:

https://www.indeed.com/jobs?q=writer

You can view Indeed's search page below.

Indeed Search Job Page

Individual job pages look like this.

Indeed Job Page


Step 2: How To Extract Data From Indeed Results and Pages

When we pull data from Indeed search results, we first need to realize that each result gets embedded within its own div card. If we can find this card, we can find all the information it holds. Each of these cards has a data-testid of slider_item.

Indeed Search Page HTML Inspection

On our individual job page, we can find the job description. Just like the example above, our data gets embedded within a div card. This div has an id of "jobDescriptionText". This card holds our entire job description.

Indeed Individual Job Page HTML Inspection


Step 3: How To Control Pagination

Remember our URL from earlier? Here it is again:

https://www.indeed.com/jobs?q=writer&l=Westland%2C+MI&start=10&vjk=a88c42edb7b19c5d

Take a look at the following query: start=10

On Indeed, each page number is a multiple of 10.

  • Page 1 is start=0.
  • Page 2 is start=10.
  • Page 3 is start=20.

To paginate our results, we can change the start parameter.


Step 4: Geolocated Data

To handle geolocation we'll actually need to do two things.

For starters, there is another parameter we need to look at: l=Westland%2C+MI. l is the location we'd like to search. If we wanted to search in London, we could pass l=London%2CUK.

Our other geolocation element actually doesn't involve Indeed at all, but rather ScrapeOps. When we talk to the ScrapeOps API, we can pass in a country param.

  • If we tell ScrapeOps "country": "us", we'll get routed through a server in the US.
  • If we want to appear in the UK, we could pass "country": "uk".

Setting Up Our Indeed Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir indeed-scraper

cd indeed-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build An Indeed Search Crawler

Time to start building our crawler. We'll add the following pieces in step-by-step.

  1. Create a parser
  2. Add pagination
  3. Store the parsed data
  4. Add concurrency
  5. Add proxy integration

Step 1: Create Simple Search Data Parser

We're going to start with a basic data parser. The goal of our parsing function will be simple. It needs to perform a search, and then extract data from the results.

The code below sets up our basic structure with error handling, retry logic, and of course adds our basic parsing function.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, locality, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.select("div[data-testid='slider_item']")


for div_card in div_cards:

name = div_card.select_one("h2").text

parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"

company_name = div_card.select_one("span[data-testid='company-name']").text

rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text

location = div_card.select_one("div[data-testid='text-location']").text


search_data = {
"name": name,
"url": url,
"stars": rating,
"company_name": company_name,
"location": location
}

print(search_data)


logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

scrape_search_results(keyword, LOCATION, LOCALITY, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

scrape_search_results() does the following:

  • Make a request to the server
  • If we don't get a status_code of 200, throw an Exception
  • Find our result cards with div_cards = soup.select("div[data-testid='slider_item']")
  • With each div_card, we:
    • Find the name: div_card.select_one("h2").text
    • Parse the url for the job_key
    • Get the company_name
    • Check for the rating and if it's present, save it to the rating variable
    • Get the location: location = div_card.select_one("div[data-testid='text-location']").text

We now have not only a basic parsing function, but also a set structure for the rest of our code.


Step 2: Add Pagination

Next, we need to add pagination. This is quite simple. We need to slightly change our URL and we also need to create a function that calls scrape_search_results() on multiple pages.

Our URL now looks like this.

url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"

Now, we'll add a start_scrape() function. It's extremely simple, it just iterates through the pages and calls scrape_search_results() on each of them.

def start_scrape(keyword, pages, location, locality, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page, retries=retries)

Here is our full script up to this point.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, locality, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.select("div[data-testid='slider_item']")


for div_card in div_cards:

name = div_card.select_one("h2").text

parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"

company_name = div_card.select_one("span[data-testid='company-name']").text

rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text

location = div_card.select_one("div[data-testid='text-location']").text


search_data = {
"name": name,
"url": url,
"stars": rating,
"company_name": company_name,
"location": location
}

print(search_data)


logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

start_scrape(keyword, PAGES, LOCATION, LOCALITY, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

With pagination, we now have the ability to control our search results. We paginated our url and we've added a function to parse a list of pages. We're now properly fetching and extracting our data. In the next section, we'll add storage for this data.


Step 3: Storing the Scraped Data

To store our data, we're going to write two classes.

  1. Our first one is a dataclass called SearchData.
    • The SearchData class is to simply hold data and represent a div_card object from our parsing function.
  2. The second class is a DataPipeline.
    • This opens a pipeline to a CSV file and puts data through the pipeline. It also uses the name field to filter out duplicates.

Here is our SearchData class.

@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is our DataPipeline.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

Once we've put it all together, our script looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.select("div[data-testid='slider_item']")


for div_card in div_cards:

name = div_card.select_one("h2").text

parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"

company_name = div_card.select_one("span[data-testid='company-name']").text

rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text

location = div_card.select_one("div[data-testid='text-location']").text


search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Key things to take from this section:

  • SearchData represents an object from our search results.
  • DataPipeline pipes our SearchData to a CSV.

Step 4: Adding Concurrency

To maximize our speed and efficiency, we need to add concurrency. This will be relatively easy. We'll just refactor start_scrape(). We'll remove the for loop and then we'll replace it with ThreadPoolExecutor.

Here is the finished function.

def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

Look at the arguments to executor.map():

  • scrape_search_results is the function we want to call on each thread.
  • All other arguments are args that get passed into scrape_search_results. We pass them in as arrays, these arrays then get passed into scrape_search_results.

Step 5: Bypassing Anti-Bots

Our scraper is almost ready. First, it needs the ability to get past roadblocks. Anti-bots are designed to detect and block malicious bots from accessing a site.

For us to get past these anti-bots (and anything else for that matter), we'll be using the ScrapeOps Proxy API. The function below takes any regular old URL and converts it into a ScrapeOps Proxied URL.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

Take a look at our payload that gets passed into ScrapeOps:

  • "api_key": you ScrapeOps API key.
  • "url": the url you'd like to scrape.
  • "country": the country we want to be routed through.
  • "residential": a boolean. If we set it to True, ScrapeOps gives us a residential IP address instead of a datacenter IP.

Here is our full code ready for production.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.select("div[data-testid='slider_item']")


for div_card in div_cards:

name = div_card.select_one("h2").text

parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"

company_name = div_card.select_one("span[data-testid='company-name']").text

rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text

location = div_card.select_one("div[data-testid='text-location']").text


search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)




if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Time to run in production and get a feel for our performance. We'll set PAGES to 3.

Here is our updated main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "Westland MI"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Feel free to tweak your results and change any of the following constants:

  • MAX_RETRIES
  • MAX_THREADS
  • PAGES
  • LOCATION
  • LOCALITY

Here are our results for 3 pages.

Crawler Performance Terminal

We scraped 3 pages in ~ 56.6 seconds. This comes out to roughly 18.86 seconds per result page. Depending on your LOCATION, your hardware, and the speed of your internet connection, results will vary.


Build An Indeed Scraper

Our crawler is now spitting out CSV files. Now, we need to build a scraper that does each of these tasks:

  1. Read the CSV file.
  2. Parse the jobs from the CSV file.
  3. Store the parsed data from each job.
  4. Parse these pages concurrently.
  5. Integrate with the ScrapeOps Proxy API.

Step 1: Create Simple Job Data Parser

As usual, we'll get started by writing a basic parsing function. Like before, it holds error handling and retry logic and it also sets the stage for all the future code that we're going to add.

Here is process_job().

def process_job(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

salary = "n/a"
salary_holder = soup.select_one("div[id='salaryInfoAndJobContainer']")
if salary_holder:
salary = salary_holder.text
description = "n/a"
description_holder = soup.select_one("div[id='jobDescriptionText']")
if description_holder:
description = description_holder.text
benefits = "n/a"
benefits_holder = soup.select_one("div[id='benefits']")
if benefits_holder:
benefits = benefits_holder.text

job_data = {
"name": row["name"],
"salary": salary,
"description": description,
"benefits": benefits
}

print(job_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")

while we have retries and the operation has not succeeded:

  • We check and see if the salary is present: soup.select_one("div[id='salaryInfoAndJobContainer']") and if it is, we pull the salary.
  • Check for a description the same way: soup.select_one("div[id='jobDescriptionText']").
  • Check for the presence of benefits the same way as well: soup.select_one("div[id='benefits']").

Step 2: Loading URLs To Scrape

In order to use our parsing function, we need to be able to read a CSV file. We're going to create another function, process_results(). This one starts off pretty similar to start_scrape().

Here is process_results():

  • We read the CSV file into an array.
  • We then iterate through the array and call process_job() on each of the rows from the array.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_job(row, location, retries=retries)

Here is our full code up to this point.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.select("div[data-testid='slider_item']")


for div_card in div_cards:

name = div_card.select_one("h2").text

parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"

company_name = div_card.select_one("span[data-testid='company-name']").text

rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text

location = div_card.select_one("div[data-testid='text-location']").text


search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_job(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

salary = "n/a"
salary_holder = soup.select_one("div[id='salaryInfoAndJobContainer']")
if salary_holder:
salary = salary_holder.text
description = "n/a"
description_holder = soup.select_one("div[id='jobDescriptionText']")
if description_holder:
description = description_holder.text
benefits = "n/a"
benefits_holder = soup.select_one("div[id='benefits']")
if benefits_holder:
benefits = benefits_holder.text

job_data = {
"name": row["name"],
"salary": salary,
"description": description,
"benefits": benefits
}

print(job_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_job(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

We now have a basic structure that we can use to finish building our scraper. process_job() looks up an individual job posting and parses its information. process_results() calls process_job() on every single job saved in our CSV file from the crawl.


Step 3: Storing the Scraped Data

To store our data, we need to create another dataclass. This one holds information from an individual job page. Take a look at JobData.

@dataclass
class JobData:
name: str = ""
salary: str = ""
description: str = ""
benefits: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

This class is basically the same as SearchData. It just holds fewer fields. Now we need to add a DataPipeline into our parsing function in order to save this information to a file.

Here is our full code up to this point.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class JobData:
name: str = ""
salary: str = ""
description: str = ""
benefits: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.select("div[data-testid='slider_item']")


for div_card in div_cards:

name = div_card.select_one("h2").text

parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"

company_name = div_card.select_one("span[data-testid='company-name']").text

rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text

location = div_card.select_one("div[data-testid='text-location']").text


search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_job(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
job_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
soup = BeautifulSoup(response.text, "html.parser")

salary = "n/a"
salary_holder = soup.select_one("div[id='salaryInfoAndJobContainer']")
if salary_holder:
salary = salary_holder.text
description = "n/a"
description_holder = soup.select_one("div[id='jobDescriptionText']")
if description_holder:
description = description_holder.text
benefits = "n/a"
benefits_holder = soup.select_one("div[id='benefits']")
if benefits_holder:
benefits = benefits_holder.text

job_data = JobData(
name=row["name"],
salary=salary,
description=description,
benefits=benefits
)

job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_job(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

We're now fetching and storing the proper data. In the coming sections, it will be time to optimize our scraper.


Step 4: Adding Concurrency

When we added concurrency earlier, we simply refactored a for loop and replaced it with ThreadPoolExecutor. We'll be doing exactly this again with process_results().

Here is our finalized process_results() function.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_job,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Just like before, our arguments to executor.map() are as follows:

  • process_job is the function we want to call on every available thread.
  • All other arguments get passed in as arrays.

Step 5: Bypassing Anti-Bots

Time to bypass anti-bots again. We already have our get_scrapeops_url() function, we just need to add it into a single line and unlock the power of proxy.

response = requests.get(get_scrapeops_url(url, location=location))

Take a look below. This is what our code looks like now that it's ready to run in production.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class JobData:
name: str = ""
salary: str = ""
description: str = ""
benefits: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
foramtted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={foramtted_location}&start={page_number * 10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.select("div[data-testid='slider_item']")


for div_card in div_cards:

name = div_card.select_one("h2").text

parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"

company_name = div_card.select_one("span[data-testid='company-name']").text

rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text

location = div_card.select_one("div[data-testid='text-location']").text


search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)


data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_job(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
job_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
soup = BeautifulSoup(response.text, "html.parser")

salary = "n/a"
salary_holder = soup.select_one("div[id='salaryInfoAndJobContainer']")
if salary_holder:
salary = salary_holder.text
description = "n/a"
description_holder = soup.select_one("div[id='jobDescriptionText']")
if description_holder:
description = description_holder.text
benefits = "n/a"
benefits_holder = soup.select_one("div[id='benefits']")
if benefits_holder:
benefits = benefits_holder.text

job_data = JobData(
name=row["name"],
salary=salary,
description=description,
benefits=benefits
)

job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_job,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Let's test this all out in production. Here is our updated main. Since we know that we can crawl pages at approximately 18.6 seconds oer page, we'll crawl just one page this time.

Once again, feel free to change constants in order to tweak your results.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

We were getting server errors with 5 threads so we switched to 2. This is not uncommon when there are overwhelming requests hitting a server. With 2 threads, we completed the crawl and the scrape in 194.38 seconds.

If you remember earlier our crawl finished with 18.86 seconds per page. 194.38 - 18.86 = 175.52 seconds. In total, we scraped 15 separate job postings.

175.52 seconds / 15 jobs = 11.7 seconds per job.


Whenever you scrape a website, you are subject to both their Terms of Service and their robots.txt.

  • You may view Indeed's terms here.
  • Their robots.txt is also available here.

It's important to note that most sites can suspend or even permanently ban you for violating their terms.

On another note, when scraping the web, public data is generally fair game. If you don't have to login to a site to view the data, this is public data.

If your data is gated behind a login, this is generally considered private data. When working with private data, you often need to get permission from the site you're scraping and you can be sued for accessing or disseminating private data.

If you're unsure whether your scraper is legal, consult an attorney.


Conclusion

You've finished the tutorial! You now know how to use Requests and BeautifulSoup. You should have a somewhat decent grasp of CSS selectors and you should have a solid understanding of parsing, pagination, data storage, concurrency, and proxy integration.

If you'd like to know more about the tech stack used in this article, take a look at the links below.


More Python Web Scraping Guides

Here at ScrapeOps, we've got a ton of learning material. Whether you're building your first ever scraper, or you've been scraping for years, we've got something for you.

Check out our Python Web Scraping Playbook! If you're interested in more of our "How To Scrape" series, check out the articles below!