How to Scrape Indeed With Selenium
If you're looking for a job, Indeed is a great place to start. It has been around since 2004 and features job listings from all over the world. With its global reach, we get access to a huge dataset. There are so many job postings that it’s nearly impossible to go through them all manually. This is where scraping becomes useful.
Today, we’ll learn how to build an Indeed scraper. It will help gather data from Indeed and create reports on available jobs.
- TLDR: How to Scrape Indeed
- How To Architect Our Scraper
- Understanding How To Scrape Indeed
- Setting Up Our Indeed Scraper
- Build An Indeed Search Crawler
- Build An Indeed Scraper
- Legal and Ethical Considerations
- Conclusion
- More Python Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Indeed
If you need to scrape Indeed but don't have time for a tutorial, use the scraper below.
- Start by creating a new project folder.
- Add the scraper file and a
config.json
file with your ScrapeOps API key. - Once that's set up, run the command:
python name_of_your_script.py
. - The scraper will create a crawler file. It will also generate individual reports for each job it finds.
import os
import csv
import json
import logging
from urllib.parse import urlencode
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
import concurrent.futures
import time
API_KEY = "Add-Your-Api-key-here"
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class JobData:
name: str
salary: str
description: str
benefits: str
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={formatted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
# Set up Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
driver.get(scrapeops_proxy_url)
logger.info(f"Received page from: {url}")
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.find_element(By.TAG_NAME, "h2").text
job_link = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")
job_key = job_link.split("jk=")[-1] if "jk=" in job_link else None
if not job_key:
continue
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.find_element(By.CSS_SELECTOR, "span[data-testid='company-name']").text
rating = None
rating_holder = div_card.find_elements(By.CSS_SELECTOR, "span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder[0].text
location = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='text-location']").text
search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_job(row, retries=3):
url = row["url"]
tries = 0
success = False
chrome_options = Options()
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Status: {driver.title}")
salary = "n/a"
try:
salary_holder = driver.find_elements(By.XPATH, "//div[@id='salaryInfoAndJobContainer']") or \
driver.find_elements(By.CSS_SELECTOR, "span[class*='salary']") or \
driver.find_elements(By.XPATH, "//div[contains(text(), 'Salary')]")
if salary_holder:
salary = salary_holder[0].text
except NoSuchElementException:
logger.warning("Salary information not found.")
description = "n/a"
try:
description_holder = driver.find_element(By.ID, "jobDescriptionText")
if description_holder:
description = description_holder.text
except NoSuchElementException:
logger.warning("Description not found.")
benefits = "n/a"
try:
benefits_holder = driver.find_elements(By.ID, "benefits") or \
driver.find_elements(By.CSS_SELECTOR, "div[data-testid='benefits-test']") or \
driver.find_elements(By.CSS_SELECTOR, "div.css-eynugf.eu4oa1w0")
if benefits_holder:
benefits = benefits_holder[0].text
except NoSuchElementException:
logger.warning("Benefits information not found.")
job_data = JobData(
name=row["name"],
salary=salary,
description=description,
benefits=benefits
)
# Save job data to a CSV file
job_filename = f"{row['name'].replace(' ', '_')}.csv"
keys = [field.name for field in fields(job_data)]
file_exists = os.path.isfile(job_filename) and os.path.getsize(job_filename) > 0
with open(job_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
writer.writerow(asdict(job_data))
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
# Use threading for processing job details
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(process_job, row, retries): row for row in reader
}
for future in concurrent.futures.as_completed(futures):
row = futures[future]
try:
future.result()
except Except
When you run the scraper, you can adjust the following constants:
MAX_RETRIES
: Sets how many times the scraper will retry if a request fails. This can happen due to network errors or non-200 HTTP responses.MAX_THREADS
: Limits the number of threads running the scraper at the same time.PAGES
: Controls how many search result pages to scrape for each keyword.LOCATION
: Sets the country or region code for the scraping.LOCALITY
: Specifies the city or area to narrow down the search.keyword_list
: A list of keywords the scraper will use to search for jobs on Indeed.
How To Architect Our Indeed Scraper
To create our Indeed scraper project, we’ll first need to scrape individual job listings from a keyword search. This part of the process is called a crawl.
Our crawler must perform the following tasks:
- Conduct a search and parse the results.
- Paginate the results for better data management.
- Save important information to a CSV file.
- Steps 1 through 3 should be done concurrently to increase speed and efficiency.
- Integrating a proxy will help prevent us from getting blocked.
Once the crawler is functioning, it will generate reports for various jobs. Next, we will need to gather detailed information about each job. This is when we will build our actual scraper.
The scraper will perform the following tasks:
- Read the crawler's report.
- Parse the individual job results.
- Save these results in a separate report.
- Steps 2 and 3 will run concurrently until the job is complete.
- We will also integrate with a proxy to avoid anti-bot measures and any other obstacles.
Understanding How To Scrape Indeed
Before we start building this project, we need to have a better understanding of the data we need to collect from Indeed.
Step 1: How To Request Indeed Pages
Like any website, we need to make a GET request. Here’s the URL we’ll be using:
https://www.indeed.com/jobs?q=writer&l=Westland%2C+MI&start=10&vjk=a88c42edb7b19c5d
Let’s break it down:
https://www.indeed.com/jobs
is the endpoint we’re accessing on the server.?q=writer\&l=Westland%2C+MI\&start=10\&vjk=a88c42edb7b19c5d
is the query string.
Queries give the server more details to refine the results. For example, q=writer
tells Indeed that we want to search for writing jobs.
If we want to look for writing jobs without any other criteria, the URL would be:
https://www.indeed.com/jobs?q=writer
You can view Indeed's search page below.
Individual job pages appear as shown below.
Step 2: How To Extract Data From Indeed Results and Pages
When extracting data from Indeed search results, it's important to note that each result is contained within its own div card. If we locate this card, we can access all the information inside it.
Each of these cards has a data-testid attribute set to slider_item
.
On each individual job page, we can locate the job description. Similar to the example above, our data is contained within a div
card.
This div is identified by the id jobDescriptionText
and holds the complete job description.
Step 3: How To Control Pagination
Do you remember the URL we discussed earlier? Here it is again:
https://www.indeed.com/jobs?q=writer&l=Westland%2C+MI&start=10&vjk=a88c42edb7b19c5d
Notice the following query: start=10
.
On Indeed, each page number corresponds to a multiple of 10:
- Page 1 is
start=0
. - Page 2 is
start=10
. - Page 3 is
start=20
.
We can change the start parameter to paginate our results.
Step 4: Geolocated Data
To manage geolocation, we need to do two things.
- First, we should look at another parameter:
l=Westland%2C+MI
.
The l
parameter indicates the location we want to search. For example, to search in London, we would use l=London%2CUK
.
- The second aspect of geo-location doesn't involve Indeed directly; it relates to ScrapeOps.
When we interact with the ScrapeOps API, we can include a country parameter.
- If we specify
"country": "us"
, we’ll be routed through a server in the US. - If we want to appear as if we’re in the UK, we can use
"country": "uk"
.
Setting Up Our Indeed Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder:
mkdir indeed-scraper
cd indeed-scraper
Create a New Virtual Environment:
python -m venv venv
Activate the Environment:
source venv/bin/activate
Install Our Dependencies:
pip install webdriver-manager
pip install selenium
Build An Indeed Search Crawler
It's time to begin building our crawler. We will add the following components step by step:
- Create a parser.
- Implement pagination.
- Store the parsed data.
- Introduce concurrency.
- Integrate a proxy.
Step 1: Create Simple Search Data Parser
We will begin with a simple data parser. The purpose of our parsing function is straightforward: it needs to conduct a search and extract data from the results.
The code below establishes our basic structure, including error handling, retry logic, and, of course, our fundamental parsing function.
import os
import csv
import json
import logging
from urllib.parse import urlencode
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
import concurrent.futures
import time
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load API key from config
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Function to scrape search results
def scrape_search_results(keyword, location, locality, retries=3):
# Set up the Chrome driver
options = webdriver.ChromeOptions()
options.add_argument("--headless") # Run in headless mode for background execution
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
formatted_keyword = keyword.replace(" ", "+")
formatted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={formatted_location}"
tries = 0
success = False
while tries < retries and not success:
try:
logger.info(f"Fetching URL: {url}")
driver.get(url)
time.sleep(3) # Wait for the page to load
# Extract job cards
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='slider_item']")
search_data_list = []
for div_card in div_cards:
try:
name = div_card.find_element(By.TAG_NAME, "h2").text
job_link = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")
job_key = job_link.split("jk=")[-1] if "jk=" in job_link else None
url = f"https://www.indeed.com/viewjob?jk={job_key}" if job_key else None
company_name = div_card.find_element(By.CSS_SELECTOR, "span[data-testid='company-name']").text
rating = None
rating_holder = div_card.find_element(By.CSS_SELECTOR, "span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text
location = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='text-location']").text
search_data = {
"name": name,
"url": url,
"stars": rating,
"company_name": company_name,
"location": location,
}
search_data_list.append(search_data)
print(search_data)
except Exception as inner_e:
logger.error(f"Error extracting data from div card: {inner_e}")
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
if not success:
logger.error(f"Max retries exceeded for URL: {url}")
driver.quit() # Close the browser after scraping
if __name__ == "__main__":
MAX_RETRIES = 3
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
# Input: List of keywords to scrape
keyword_list = ["writer"]
# Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, LOCALITY, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
The function scrape_search_results()
performs the following tasks:
- Sends a request to the server.
- If the status code is not 200, it raises an Exception.
- Retrieves the result cards using:
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='slider_item']")
For each div_card
, we:
- Find the job name with:
name = div_card.find_element(By.TAG_NAME, "h2").text
- Parse the URL to obtain the job key.
- Retrieve the company name.
- Check for a rating; if it exists, save it to the rating variable.
- Get the location using:
location = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='text-location']").text
We now have a basic parsing function and a structured approach for the rest of our code.
Step 2: Add Pagination
Next, we need to implement pagination. This is straightforward. We only need to modify our URL slightly and create a function that calls scrape_search_results()
for multiple pages.
Our updated URL now looks like this:
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={formatted_location}&start={page_number * 10}"
Now, we'll add a start_scrape()
function. This function is very simple; it iterates through the pages and calls scrape_search_results()
for each one:
def start_scrape(keyword, pages, location, locality, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page, retries=retries)
Here is our complete script up to this point.
import json
import logging
import time
from urllib.parse import urlencode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load API key from config
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Function to scrape search results
def scrape_search_results(keyword, location, locality, page_number, retries=3):
# Set up the Chrome driver
options = webdriver.ChromeOptions()
options.add_argument("--headless") # Run in headless mode for background execution
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)
formatted_keyword = keyword.replace(" ", "+")
formatted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={formatted_location}&start={page_number * 10}"
tries = 0
success = False
while tries < retries and not success:
try:
logger.info(f"Fetching URL: {url}")
driver.get(url)
time.sleep(3) # Wait for the page to load
# Extract job cards
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='slider_item']")
search_data_list = []
for div_card in div_cards:
try:
name = div_card.find_element(By.TAG_NAME, "h2").text
job_link = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")
job_key = job_link.split("jk=")[-1] if "jk=" in job_link else None
url = f"https://www.indeed.com/viewjob?jk={job_key}" if job_key else None
company_name = div_card.find_element(By.CSS_SELECTOR, "span[data-testid='company-name']").text
rating = None
rating_holder = div_card.find_element(By.CSS_SELECTOR, "span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text
location = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='text-location']").text
search_data = {
"name": name,
"url": url,
"stars": rating,
"company_name": company_name,
"location": location
}
search_data_list.append(search_data)
print(search_data)
except Exception as inner_e:
logger.error(f"Error extracting data from div card: {inner_e}")
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
if not success:
logger.error(f"Max retries exceeded for URL: {url}")
driver.quit() # Close the browser after scraping
def start_scrape(keyword, pages, location, locality, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
# Input: List of keywords to scrape
keyword_list = ["writer"]
PAGES = 2 # Specify the number of pages you want to scrape
# Job Processes
for keyword in keyword_list:
start_scrape(keyword, PAGES, LOCATION, LOCALITY, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
With pagination in place, we can now manage our search results effectively.
We’ve updated our URL for pagination and created a function to parse multiple pages. This allows us to fetch and extract our data properly.
In the next section, we will implement a way to store this data.
Step 3: Storing the Scraped Data
To store our data, we will create two classes.
- The first is a dataclass named
SearchData
.
- This class is designed to hold data and represent a
div_card
object from our parsing function.
- The second class is
DataPipeline
.
- This class opens a pipeline to a CSV file and processes the data through it. It also utilizes the name field to filter out duplicates.
Here is the SearchData
class.
from dataclasses import dataclass, fields
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is our DataPipeline
.
import os
import csv
import time
from dataclasses import asdict, fields
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
After combining everything, our script appears as follows.
import os
import csv
import json
import logging
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
# Load configuration
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
logger.info("Saving data to CSV.")
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
logger.info("No data to save.")
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
logger.info(f"Saved: {item}")
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
logger.info(f"Adding data: {scraped_data.name}")
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
logger.info(f"Data added to storage queue. Queue size: {len(self.storage_queue)}")
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={formatted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome() # Ensure chromedriver is installed and in PATH
driver.get(url)
time.sleep(3) # Wait for page to load
logger.info(f"Fetching URL: {url}")
# Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.find_element(By.TAG_NAME, "h2").text
job_url = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")
company_name = div_card.find_element(By.CSS_SELECTOR, "span[data-testid='company-name']").text
rating = None
try:
rating = div_card.find_element(By.CSS_SELECTOR, "span[data-testid='holistic-rating']").text
except Exception:
pass # Rating may not be present
location = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='text-location']").text
search_data = SearchData(
name=name,
url=job_url,
stars=rating,
company_name=company_name,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
tries += 1
time.sleep(2)
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
PAGES = 2
LOCATION = "us"
LOCALITY = "mi"
logger.info("Crawl starting...")
keyword_list = ["writer"]
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info("Crawl complete.")
Step 4: Adding Concurrency
To enhance our speed and efficiency, we need to implement concurrency. This will be quite straightforward.
We'll refactor the start_scrape()
function by removing the for loop and replacing it with ThreadPoolExecutor
.
Here is the updated function:
import concurrent.futures
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages, # Repeated keyword for all pages
[location] * pages, # Repeated location for all pages
[locality] * pages, # Repeated locality for all pages
range(pages), # Page numbers to iterate
[data_pipeline] * pages, # Same data pipeline instance
[retries] * pages # Retry count for all tasks
)
Take a look at the arguments for executor.map()
:
scrape\_search\_results()
is the function we want to execute on each thread.- The other arguments are the parameters that will be passed into
scrape_search_results
. - We provide them as arrays, which are then sent into
scrape_search_results()
.
Step 5: Bypassing Anti-Bots
Our scraper is nearly complete. First, it needs to navigate around obstacles. Anti-bot systems are designed to identify and block harmful bots from accessing a website.
To bypass these anti-bots (and any other barriers), we will use the ScrapeOps Proxy API.
The function below takes a regular URL and converts it into a ScrapeOps proxied URL.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
Here’s a look at the payload that we send to ScrapeOps:
api_key
: Your ScrapeOps API key.url
: The URL you want to scrape.country
: The country through which we want to be routed.residential
: A boolean value. If set to True, ScrapeOps provides us with a residential IP address instead of a datacenter IP.
Here is our complete code, ready for production.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode, urlparse, parse_qs
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
import time
# Load API key from configuration file
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={formatted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Received [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.select_one("h2").text
parsed_url = urlparse(div_card.find("a").get("href"))
query_params = parse_qs(parsed_url.query)
has_job_key = "jk" in query_params.keys()
if not has_job_key:
continue
job_key = query_params["jk"][0]
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.select_one("span[data-testid='company-name']").text
rating = None
rating_holder = div_card.select_one("span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder.text
location = div_card.select_one("div[data-testid='text-location']").text
search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
tries += 1
logger.info(f"Retrying request for page: {url}, retries left {retries - tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "california"
logger.info(f"Crawl starting...")
keyword_list = ["writer"]
aggregate_files = []
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Now, we get the time to run in production to experience the performance. When we set all the PAGES
to 3, here is the updated main:
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
keyword_list = ["writer"]
aggregate_files = []
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
You can always modify the constants given below:
MAX_RETRIES
MAX_THREADS
PAGES
LOCATION
LOCALITY
You can see the results for three pages:
It is interesting to note that our scraper scraped 3 pages in ~ 17.039 seconds. This comes out to roughly 5.679 seconds per result page.
The results may vary depending on:
- Your location
- Hardware
- Speed of internet connection
Build An Indeed Scraper
Our crawler generates the CSV files. The next task is to build a scraper that performs the tasks given below:
- Reading the CSV file.
- Parsing the jobs from the CSV file.
- Storing the parsed data.
- Parsing the pages concurrently.
- Integrate with the ScrapeOps Proxy API.
Step 1: Create Simple Job Data Parser
First, we will write a basic parsing function. This function holds retry logic and error handling. Moreover, it will also set a base for the future code sections.
Here is process_job()
:
def process_job(row, location, retries=3):
url = row["url"]
tries = 0
success = False
chrome_options = Options()
chrome_options.add_argument("--headless")
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
driver.get(get_scrapeops_url(url, location))
try:
if driver.title:
logger.info(f"Status: 200")
job_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
salary = "n/a"
try:
salary_holder = driver.find_elements(By.ID, "salaryInfoAndJobContainer")
if salary_holder:
salary = salary_holder[0].text
except NoSuchElementException:
logger.warning("Salary info not found.")
description = "n/a"
try:
description_holder = driver.find_elements(By.ID, "jobDescriptionText")
if description_holder:
description = description_holder[0].text
except NoSuchElementException:
logger.warning("Job description not found.")
benefits = "n/a"
try:
benefits_holder = driver.find_elements(By.ID, "benefits")
if benefits_holder:
benefits = benefits_holder[0].text
except NoSuchElementException:
logger.warning("Benefits info not found.")
job_data = JobData(
name=row["name"],
salary=salary,
description=description,
benefits=benefits
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True
else:
logger.warning("Failed Response: Not Found")
raise Exception("Failed Request, page not loaded.")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries - tries}")
tries += 1
driver.quit()
if not success:
raise Exception("Max Retries exceeded: {retries}")
While we have retries and the operation has not succeeded, we proceed to extract the necessary job data:
- We check and see if the salary is present:
driver.find_elements(By.ID, "salaryInfoAndJobContainer")
. - If the element is found, we pull the salary text.
- Check for a description the same way:
driver.find_elements(By.ID, "jobDescriptionText")
. - Check for the presence of benefits the same way as well:
driver.find_elements(By.ID, "benefits")
.
Step 2: Loading URLs To Scrape
To utilize our parsing function, we need to read data from a CSV file. We’ll create another function called process_results()
, which will be quite similar to start_scrape()
.
Here’s how process_results()
works:
First, we read the CSV file into an array.
Then, we loop through that array and call process_job()
for each row.
def process_results(csv_file, location, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_job(row, location, retries=retries)
Here is the complete code:
import os
import csv
import json
import logging
from urllib.parse import urlencode
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
import concurrent.futures
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={formatted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
# Set up Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless") # Optional: Run in headless mode
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
driver.get(scrapeops_proxy_url)
logger.info(f"Received page from: {url}")
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.find_element(By.TAG_NAME, "h2").text
job_link = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")
job_key = job_link.split("jk=")[-1] if "jk=" in job_link else None
if not job_key:
continue
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.find_element(By.CSS_SELECTOR, "span[data-testid='company-name']").text
rating = None
rating_holder = div_card.find_elements(By.CSS_SELECTOR, "span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder[0].text
location = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='text-location']").text
search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
finally:
driver.quit() # Ensure the driver closes after processing
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_job(row, location, retries=3):
url = row["url"]
tries = 0
success = False
chrome_options = Options()
chrome_options.add_argument("--headless") # Optional: Run in headless mode
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Status: {driver.title}")
salary = "n/a"
salary_holder = driver.find_elements(By.ID, "salaryInfoAndJobContainer")
if salary_holder:
salary = salary_holder[0].text
description = "n/a"
description_holder = driver.find_elements(By.ID, "jobDescriptionText")
if description_holder:
description = description_holder[0].text
benefits = "n/a"
benefits_holder = driver.find_elements(By.ID, "benefits")
if benefits_holder:
benefits = benefits_holder[0].text
job_data = {
"name": row["name"],
"salary": salary,
"description": description,
"benefits": benefits
}
print(job_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
driver.quit() # Close the browser after processing
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_job(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
executor.map(
scrape_search_results,
[keyword] * PAGES,
[LOCATION] * PAGES,
[LOCALITY] * PAGES,
range(PAGES),
[crawl_pipeline] * PAGES,
[MAX_RETRIES] * PAGES
)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
We need to generate another dataclass to help us store our data. This one holds information from an individual job page.
Check out our JobData
class.
@dataclass
class JobData:
name: str = ""
salary: str = ""
description: str = ""
benefits: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
It is interesting to note that this class resembles to the SearchData
. Moreover, it contains a very few fields. Now we need to add a DataPipeline
into our parsing function. It will help us save this information to a file.
Here is the complete code:
import os
import csv
import json
import logging
from urllib.parse import urlencode
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
import concurrent.futures
import time
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class JobData:
name: str
salary: str
description: str
benefits: str
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={formatted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
chrome_options = Options()
chrome_options.add_argument("--headless")
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
driver.get(scrapeops_proxy_url)
logger.info(f"Received page from: {url}")
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.find_element(By.TAG_NAME, "h2").text
job_link = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")
job_key = job_link.split("jk=")[-1] if "jk=" in job_link else None
if not job_key:
continue
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.find_element(By.CSS_SELECTOR, "span[data-testid='company-name']").text
rating = None
rating_holder = div_card.find_elements(By.CSS_SELECTOR, "span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder[0].text
location = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='text-location']").text
search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_job(row, data_pipeline, retries=3):
url = row["url"]
tries = 0
success = False
chrome_options = Options()
chrome_options.add_argument("--headless")
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Status: {driver.title}")
salary = "n/a"
salary_holder = driver.find_elements(By.ID, "salaryInfoAndJobContainer")
if salary_holder:
salary = salary_holder[0].text
description = "n/a"
description_holder = driver.find_elements(By.ID, "jobDescriptionText")
if description_holder:
description = description_holder[0].text
benefits = "n/a"
benefits_holder = driver.find_elements(By.ID, "benefits")
if benefits_holder:
benefits = benefits_holder[0].text
job_data = JobData(
name=row["name"],
salary=salary,
description=description,
benefits=benefits
)
job_filename = f"{row['name'].replace(' ', '_')}.csv"
keys = [field.name for field in fields(job_data)]
file_exists = os.path.isfile(job_filename) and os.path.getsize(job_filename) > 0
with open(job_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
writer.writerow(asdict(job_data))
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, data_pipeline, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_job(row, data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
keyword_list = ["writer"]
aggregate_files = []
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
job_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
executor.map(
scrape_search_results,
[keyword] * PAGES,
[LOCATION] * PAGES,
[LOCALITY] * PAGES,
range(PAGES),
[crawl_pipeline] * PAGES,
[MAX_RETRIES] * PAGES
)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
process_results(f"{filename}.csv", job_pipeline, retries=MAX_RETRIES)
job_pipeline.close_pipeline()
aggregate_files.append(f"{filename}_jobs.csv")
logger.info(f"Done. Saved {len(aggregate_files)} files.")
We can not fetch and store the data. Let’s optimize our scraper now:
Step 4: Adding Concurrency
When we implemented concurrency earlier, we refactored a for loop and substituted it with ThreadPoolExecutor
. We'll do the same for process_results()
.
Here is the final version of the process_results()
function.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
# Use ThreadPoolExecutor for concurrent processing of job data
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_job,
reader,
[location] * len(reader),
[retries] * len(reader)
)
logger.info(f"Finished processing {csv_file}")
Here are the arguments to executor.map()
:
process_job
can help us call on every available thread.- All other arguments get passed in as arrays.
Step 5: Bypassing Anti-Bots
It's time to bypass anti-bots once more. We already have our get_scrapeops_url()
function; we just need to incorporate it into a single line to harness the power of the proxy.
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
Take a look at the new updated code:
import os
import csv
import json
import logging
from urllib.parse import urlencode
from dataclasses import dataclass, field, fields, asdict
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
import concurrent.futures
import time
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
stars: float = None
company_name: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class JobData:
name: str
salary: str
description: str
benefits: str
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_location = locality.replace(" ", "+")
url = f"https://www.indeed.com/jobs?q={formatted_keyword}&l={formatted_location}&start={page_number * 10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
# Set up Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
driver.get(scrapeops_proxy_url)
logger.info(f"Received page from: {url}")
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-testid='slider_item']")
for div_card in div_cards:
name = div_card.find_element(By.TAG_NAME, "h2").text
job_link = div_card.find_element(By.TAG_NAME, "a").get_attribute("href")
job_key = job_link.split("jk=")[-1] if "jk=" in job_link else None
if not job_key:
continue
url = f"https://www.indeed.com/viewjob?jk={job_key}"
company_name = div_card.find_element(By.CSS_SELECTOR, "span[data-testid='company-name']").text
rating = None
rating_holder = div_card.find_elements(By.CSS_SELECTOR, "span[data-testid='holistic-rating']")
if rating_holder:
rating = rating_holder[0].text
location = div_card.find_element(By.CSS_SELECTOR, "div[data-testid='text-location']").text
search_data = SearchData(
name=name,
url=url,
stars=rating,
company_name=company_name,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_job(row, retries=3):
url = row["url"]
tries = 0
success = False
chrome_options = Options()
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
while tries <= retries and not success:
try:
driver.get(url)
logger.info(f"Status: {driver.title}")
salary = "n/a"
try:
salary_holder = driver.find_elements(By.XPATH, "//div[@id='salaryInfoAndJobContainer']") or \
driver.find_elements(By.CSS_SELECTOR, "span[class*='salary']") or \
driver.find_elements(By.XPATH, "//div[contains(text(), 'Salary')]")
if salary_holder:
salary = salary_holder[0].text
except NoSuchElementException:
logger.warning("Salary information not found.")
description = "n/a"
try:
description_holder = driver.find_element(By.ID, "jobDescriptionText")
if description_holder:
description = description_holder.text
except NoSuchElementException:
logger.warning("Description not found.")
benefits = "n/a"
try:
benefits_holder = driver.find_elements(By.ID, "benefits") or \
driver.find_elements(By.CSS_SELECTOR, "div[data-testid='benefits-test']") or \
driver.find_elements(By.CSS_SELECTOR, "div.css-eynugf.eu4oa1w0")
if benefits_holder:
benefits = benefits_holder[0].text
except NoSuchElementException:
logger.warning("Benefits information not found.")
job_data = JobData(
name=row["name"],
salary=salary,
description=description,
benefits=benefits
)
# Save job data to a CSV file
job_filename = f"{row['name'].replace(' ', '_')}.csv"
keys = [field.name for field in fields(job_data)]
file_exists = os.path.isfile(job_filename) and os.path.getsize(job_filename) > 0
with open(job_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
writer.writerow(asdict(job_data))
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, retries=3):
logger.info(f"Processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
# Use threading for processing job details
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(process_job, row, retries): row for row in reader
}
for future in concurrent.futures.as_completed(futures):
row = futures[future]
try:
future.result()
except Exception as e:
logger.error(f"Error processing job {row['name']}: {e}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "_") + ".csv"
pipeline = DataPipeline(csv_filename=filename)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
future_threads = {
executor.submit(scrape_search_results, keyword, LOCATION, LOCALITY, i, pipeline): i for i in range(PAGES)
}
logger.info(f"Pipeline Closed...")
pipeline.close_pipeline()
aggregate_files.append(filename)
for csv_file in aggregate_files:
process_results(csv_file)
Step 6: Production Run
Here is our updated main. Since we know that we can crawl pages at approximately 5.679 seconds per page, we'll crawl just one page this time.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 2
PAGES = 1
LOCATION = "us"
LOCALITY = "Westland MI"
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["writer"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "_") + ".csv"
data_pipeline = DataPipeline(csv_filename=filename)
for page in range(PAGES):
scrape_search_results(keyword, LOCATION, LOCALITY, page, data_pipeline=data_pipeline, retries=MAX_RETRIES)
data_pipeline.close_pipeline()
aggregate_files.append(filename)
# Process each CSV file for job details in parallel
for csv_file in aggregate_files:
process_results(csv_file, retries=MAX_RETRIES)
We were getting server errors with 5 threads so we switched to 2. This is not uncommon when there are overwhelming requests hitting a server. With 2 threads, we completed the crawl and the scrape in 38.97 seconds.
If you remember earlier our crawl finished with 18.86 seconds per page. 38.97 - 5.68 = 33.29 seconds. In total, we scraped 15 separate job postings.
33.29 seconds / 15 jobs = 2.22 seconds per job.
Legal and Ethical Considerations
Whenever you scrape a website, you are subject to both their Terms of Service and their robots.txt
.
It's important to note that most sites can suspend or even permanently ban you for violating their terms.
On another note, when scraping the web, public data is generally fair game. If you don't have to login to a site to view the data, this is public data.
If your data is gated behind a login, this is generally considered private data. When working with private data, you often need to get permission from the site you're scraping and you can be sued for accessing or disseminating private data.
If you're unsure whether your scraper is legal, consult an attorney.
Conclusion
You've finished the tutorial! You now know how to use Requests and BeautifulSoup. You should have a somewhat decent grasp of CSS selectors and you should have a solid understanding of parsing, pagination, data storage, concurrency, and proxy integration.
If you'd like to know more about the tech stack used in this article, take a look at the links below.
More Python Web Scraping Guides
Here at ScrapeOps, we've got a ton of learning material. Whether you're building your first ever scraper, or you've been scraping for years, we've got something for you.
Check out our Selenium Web Scraping Playbook! If you're interested in more of our "How To Scrape" series, check out the articles below!