Skip to main content

Scrape Linkedin Jobs With Python Requests and BeautifulSoup

How to Scrape Linkedin Jobs With Requests and BeautifulSoup

Since its founding, LinkedIn has been a go-to social network for all sorts of career professionals. Eventually, it became a great place for job postings and in 2016, it was even bought by Microsoft. Because of its widespread use, and the fact that it integrates social media with job postings, LinkedIn is a great place to look for a job.

Today, we're going to crawl LinkedIn's job postings and then scrape the individual pages for those listings. This is a great way to collect large amounts of data and get a larger picture of the market for certain jobs.


TLDR - How to Scrape LinkedIn Jobs

Looking to scrape LinkedIn jobs? Look no further!

To use our prebuilt scraper:

  1. Make a new project folder with a config.json file.
  2. Inside the config file, add you ScrapeOps API key: {"api_key": "your-super-secret-api-key"}.
  3. Then copy and paste the code below into a Python file.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")


soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code != 200:
raise Exception(f"Failed Request, status code: {response.status_code}")

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

job_criteria = soup.find_all("li", class_="description__job-criteria-item")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")

job_data = JobData(
name=row["name"],
seniority=seniority,
position_type=position_type,
job_function=job_function,
industry=industry
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_posting,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Feel free to change any of the following to control your results:

  • MAX_RETRIES: Defines the maximum number of times the script will attempt to retrieve a webpage if the initial request fails (e.g., due to network issues or rate limiting).
  • MAX_THREADS: Sets the maximum number of threads that the script will use concurrently during scraping.
  • PAGES: The number of pages of job listings to scrape for each keyword.
  • LOCATION: The country code or identifier for the region from which job listings should be scraped (e.g., "us" for the United States).
  • LOCALITY: The textual representation of the location where the jobs are being scraped (e.g., "United States").
  • keyword_list: A list of keywords representing job titles or roles to search for on LinkedIn (e.g., ["software engineer"]).

You can then run your scraper with python name_of_your_script.py. You'll get a CSV named after the keyword you searched. Then, you'll get an individual CSV report on each job as well.


How To Architect Our LinkedIn Jobs Scraper

In order to scrape LinkedIn, we're going to build two different scrapers, a search crawler, and a job scraper. At the highest level, this process is relatively simple.

  1. First, our crawler runs a keyword search for jobs with a certain title, and then saves the results.
  2. Once we've got results, our scraper will then go through and scrape each individual job posting we find in the results.

Let's break this process down into smaller pieces. We'll start with defining our crawl from start to finish, and then we'll do our scrape.

Step by step, here is how we'll build our crawler:

  1. Write a search results parser to interpret our data.
  2. Add pagination, this way, we get more results and finer control over them.
  3. Create some classes for data storage, and then use them to save our parsed results.
  4. Use ThreadPoolExecutor to add support for multithreading and therefore concurrency.
  5. Write a function for proxy integration and use it to bypass LinkedIn's anti-bot system.

Here are the steps we'll go through when building our scraper.

  1. Write a parser to pull information from individual job postings.
  2. Give our scraper the ability to read a CSV file.
  3. Add another class for data storage and build the storage into our parsing function.
  4. Add ThreadPoolExecutor to scrape posting data concurrently.
  5. Use our proxy function from earlier to bypass anti-bots.

Understanding How To Scrape LinkedIn Jobs

We can't just plunge straight into coding. We need to look at how this is done from a user standpoint first. We need to understand how to request these pages and how to extract data from them. We also need to see how pagination works and we need to know how to control our location.


Step 1: How To Request LinkedIn Jobs Pages

Anytime you go to a website, it starts with a GET request.

  • Our browser makes a GET to LinkedIn.
  • Then, LinkedIn sends back an HTML page.
  • Our browser reads the page and displays our results.
  • Instead of displaying results, our goal is to write a program that reads the results and finds the relevant information.
  • We save the information we want and get on with our day.

To GET a search from LinkedIn Job results, we'll use the following URL structure:

https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer="

If we wanted to search the US for Software Engineer jobs, our URL would look like this:

https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location={formatted_locality}&original_referer=

As you might have noticed from the URL, we're going to use their builtin API to fetch our jobs. Interestingly enough, this API doesn't give us JSON or XML, it sends back straight HTML. Take a look at our search results below.

Linkedin Job Search HTML

We also need to make another GET when we scrape an individual job posting. If you look at this page, you can see some finer details such as seniority, job function, employment type, and industries.

Linkedin Job page


Step 2: How To Extract Data From LinkedIn Jobs Results and Pages

Now that we know what these pages look like, we need to see exactly where to pull our information from.

  • On the search results page, all of our information is connected to a div card with a class name, base-search-card__info.
  • On a job page, our information comes as an li element with a class name, description__job-criteria-item.

Take a look below it our base-search-card__info.

Linkedin Job Search HTML Inspection

In this next image, you'll see one of the li items that we would extract.

Linkedin Job page HTML Inspection


Step 3: How To Control Pagination

We use pagination to control our search results. We need to add one parameter to our URL, &start={page_number*10}.

Our full URL for page 1 of the Software Engineer search would look like this:

https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location=United+States&original_referer=&start=0

We use page_number*10 because we begin counting at 0 and each request yields 10 results. Page 0 (0 * 10) gives us results 1 through 10. Page 1 gives us 11 through 20 and so on and so forth.

Inside our Python code, the URL would look like this:

f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"

Step 4: Geolocated Data

To control our geolocation, we'll be using the ScrapeOps Proxy API. This API can take in all sorts of arguments, but the one we use for this is called country.

  • If we want to appear in the US, we can pass "country": "us" into the API.
  • If we want to appear in the UK, we'd pass "country": "uk".

You can find a full list of ScrapeOps supported countries here.


Setting Up Our LinkedIn Jobs Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir linkedin-jobs-scraper

cd linkedin-jobs-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A LinkedIn Jobs Search Crawler

We're now ready to build our crawler. We know what it needs to do and we'll implement it in a series of 5 steps.

  1. First, we're going to build a basic script with error handling, retry logic, and our basic parser.
  2. We'll add pagination.
  3. Create a couple classes and use them to implement data storage.
  4. Add concurrency to scrape multiple pages simultaneously.
  5. Integrate with the ScrapeOps Proxy API in order to get past even the most stringent of anti-bot protocols.

Step 1: Create Simple Search Data Parser

Time to begin. We're going to start with a basic script that sets up our basic structure and adds error handling, retry logic and our basic parsing function. This will lay the foundation for everything else we add later on.

Pay close attention to our parsing function, scrape_search_results().

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



def scrape_search_results(keyword, location, locality, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer="
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")


soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text

search_data = {
"name": company_name,
"job_title": job_title,
"url": job_link,
"location": location
}

print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, LOCALITY)
logger.info(f"Crawl complete.")
  • We use soup.find_all("div", class_="base-search-card__info") to find all of our base result cards.
  • div_card.find("h4", class_="base-search-card__subtitle").text finds our company_name.
  • Our job title is inside an h3, so we use div_card.find("h3", class_="base-search-card__title").text to find it.
  • Our link is actually embedded in the parent element, so we extract it with div_card.parent.find("a").
  • We then pull the href from the link element with link.get("href").
  • Finally, div_card.find("span", class_="job-search-card__location").text gets the job location from the card.

Step 2: Add Pagination

As mentioned earlier, adding pagination is very simple. We just need to add start={page_number*10} to the end of our URL. We also need a function that allows us to scrape multiple pages, we'll call it start_scrape().

Our fully paginated urls are laid out in the snippet you see below.

    url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"

start_scrape() is in our next snippet. At the moment, it's just a simple for loop that parses pages using iteration. Later on, we'll make some improvements to it.

def start_scrape(keyword, pages, location, locality, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page_number, retries=retries)

You can see how it all fits together in the full code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



def scrape_search_results(keyword, location, locality, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")


soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text

search_data = {
"name": company_name,
"job_title": job_title,
"url": job_link,
"location": location
}

print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page_number, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
start_scrape(keyword, PAGES, LOCATION, LOCALITY, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
  • start={page_number*10} gives us the ability to control pagination inside our url.
  • start_scrape() allows us to parse a list of pages.

Step 3: Storing the Scraped Data

In order to store our data, we need to write a couple of classes.

  1. Our first one is a dataclass called SearchData.
  2. The second one is our DataPipeline. SearchData simply needs to represent individual search items.

DataPipeline needs to open a pipe to a CSV file and store SearchData objects inside our CSV.

Here is our SearchData. It holds the name, job_title, url and location that we find during the parse.

@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Once we've got our SearchData, it gets passed into the DataPipeline you see below. The DataPipeline first checks to see if our CSV file exists. If it exists, we append the file.

If the file doesn't exist, we create one. This approach stops us from accidentally destroying important data. This class also filters out duplicates using the name attribute.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

Putting it all together, we get a script that looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")


soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page_number, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • We use SearchData to represent individual results from our search results page.
  • DataPipeline is used to store these objects in a safe and effficient way.

Step 4: Adding Concurrency

To add concurrency support, we're going to use multithreading.

To add multithreading, we're going to use ThreadPoolExecutor and we're going to remove our for loop from start_scrape().

ThreadPoolExecutor allows us to open a pool with max_threads. If we want to use 4 threads, we pass max_threads=4.

def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

Our arguments to executor.map() go as follows:

  • scrape_search_results: the function we want to call on all these available threads.
  • All other arguments get passed in as arrays.
  • These arrays of arguments then get passed into the function we're calling on multiple threads.

Our full code now looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")


soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

We can now crawl multiple pages simultaneously.


Step 5: Bypassing Anti-Bots

To bypass anti-bots, we're going to write a simple function that takes a url and a location. Along with these, the function will handle some set parameters and spit out a ScrapeOps proxied URL.

Take a look at get_scrapeops_url().

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

Let's unpack our payload.

  • "api_key": our ScrapeOps API key.
  • "url": the url we want to scrape.
  • "country": the country we want to appear in.

Our full production crawler is available below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")


soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

Time to run it in production. We're going to crawl 3 pages using 5 threads. If you're looking for different results, try changing any of the following.

  • MAX_RETRIES: Defines the maximum number of times the script will attempt to retrieve a webpage if the initial request fails (e.g., due to network issues or rate limiting).
  • MAX_THREADS: Sets the maximum number of threads that the script will use concurrently during scraping.
  • PAGES: The number of pages of job listings to scrape for each keyword.
  • LOCATION: The country code or identifier for the region from which job listings should be scraped (e.g., "us" for the United States).
  • LOCALITY: The textual representation of the location where the jobs are being scraped (e.g., "United States").
  • keyword_list: A list of keywords representing job titles or roles to search for on LinkedIn (e.g., ["software engineer"]).
if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Take a look at our results.

Crawler Performance Terminal

As you can see, we crawled 3 pages in 6.99 seconds. This comes out to an average of 2.33 secconds per page.


Build A LinkedIn Jobs Scraper

Now, it's time to build our scraper. Our scraper needs to be able to read a CSV file. Then, it needs to parse each page from the file. It needs to store the parsed data. Once it can do the things we just mentioned, we need to go through and add concurrency and proxy support.


Step 1: Create Simple Job Data Parser

Like we did earlier, we'll start by writing a simple parsing function. This function will have error handling and retry logic just like before. Take a look at process_posting(). Like before, pay close attention to our parsing logic.

def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code != 200:
raise Exception(f"Failed Request, status code: {response.status_code}")

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

job_criteria = soup.find_all("li", class_="description__job-criteria-item")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")

job_data = {
"name": row["name"],
"seniority": seniority,
"position_type": position_type,
"job_function": job_function,
"industry": industry
}

print(job_data)
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
  • soup.find_all("li", class_="description__job-criteria-item") finds all of our criteria pieces.
  • The criteria list goes as follows:
    • job_criteria[0]: senority level
    • job_criteria[1]: position type
    • job_criteria[2]: job function
    • job_criteria[3]: industry

Step 2: Loading URLs To Scrape

Without a CSV file to read, our parsing function is pretty useless. We're going to write a function that reads a CSV file and uses a for loop to call process_posting() on each row from the file.

Here is our first iteration of process_results(). Later on, we'll rewrite it and add multithreading support.

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_posting(row, location, retries=retries)

In the full code below, we're now updated to perform a crawl, and then scrape individual job postings.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")


soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code != 200:
raise Exception(f"Failed Request, status code: {response.status_code}")

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")

job_criteria = soup.find_all("li", class_="description__job-criteria-item")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")

job_data = {
"name": row["name"],
"seniority": seniority,
"position_type": position_type,
"job_function": job_function,
"industry": industry
}

print(job_data)
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_posting(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

Step 3: Storing the Scraped Data

We already have a DataPipeline. Storing our data will be very easy at this point. We just need another dataclass. Take a look below at JobData.

Just like our SearchData from earlier, we use it to represent the data we scraped from the page.

@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In our full code below, our parsing function now opens a DataPipeline. Then, instead of printing our parsed data, we create a JobData object out of it and then pass our JobData into the pipeline.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")


soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code != 200:
raise Exception(f"Failed Request, status code: {response.status_code}")

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

job_criteria = soup.find_all("li", class_="description__job-criteria-item")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")

job_data = JobData(
name=row["name"],
seniority=seniority,
position_type=position_type,
job_function=job_function,
industry=industry
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_posting(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
  • JobData holds the data we pull from the page.
  • DataPipeline takes a JobData object and pipes it to a CSV file.

Step 4: Adding Concurrency

For concurrency support, we're going to use ThreadPoolExecutor like we did earlier.

Take a look at our refactored version of process_results().

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_posting,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Look at our arguments to executor.map():

  • process_posting: the function we want to call on multiple threads.
  • All arguments to process_posting get passed in as arrays.

Step 5: Bypassing Anti-Bots

To bypass anti-bots using our scraper, we just need to reuse a function we wrote at the beginning of our crawler.

We'll change one line of our parsing function, the GET request.

response = requests.get(get_scrapeops_url(url, location=location))

Our full production code is available below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")


soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code != 200:
raise Exception(f"Failed Request, status code: {response.status_code}")

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

job_criteria = soup.find_all("li", class_="description__job-criteria-item")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")

job_data = JobData(
name=row["name"],
seniority=seniority,
position_type=position_type,
job_function=job_function,
industry=industry
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_posting,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv"