Skip to main content

Scrape Linkedin Jobs With Python Seleniump

How to Scrape Linkedin Jobs With Selenium

Since 2003, LinkedIn, LinkedIn has been a one-stop shop for all sorts of career opportunities. From job postings to professional networking, LinkedIn pretty much has it all. LinkedIn has been built specifically to stop scrapers, but if you know what you're doing, you can get tons of aggregate data. This data is extremely valuable.

Today, we'll go through the process of scraping LinkedIn jobs from start to finish.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape LinkedIn Jobs

If you need a scraper but don't have time to read, look no further! You can use our prebuilt scraper.

  1. Make a new project folder and add a config.json file.
  2. Inside of your new config file, add you ScrapeOps API key: {"api_key": "your-super-secret-api-key"}.
  3. Then copy and paste the code below into a Python file.
  4. python name_of_your_script.py is the command you'll use to run the scraper.

Once the scraper has finished running, you'll get a CSV named after your search. This one will contain all of your search data.

You'll also get an individual CSV file for each of the listings from the CSV. These individual files contain more detailed information about each job posting.

import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

options = webdriver.ChromeOptions()
options.add_argument("--headless")



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False


while tries <= retries and not success:

driver = webdriver.Chrome(options=options)

try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")

if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")

for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(get_scrapeops_url(url, location=location))

job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")

job_data = JobData(
name=row["name"],
seniority=seniority,
position_type=position_type,
job_function=job_function,
industry=industry
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_posting,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

You can change any of the following from main to fine-tune your results:

  • MAX_RETRIES: Defines the maximum number of times the script will attempt to retrieve a webpage if the initial request fails (e.g., due to network issues or rate limiting).
  • MAX_THREADS: Sets the maximum number of threads that the script will use concurrently during scraping.
  • PAGES: The number of pages of job listings to scrape for each keyword.
  • LOCATION: The country code or identifier for the region from which job listings should be scraped (e.g., "us" for the United States).
  • LOCALITY: The textual representation of the location where the jobs are being scraped (e.g., "United States").
  • keyword_list: A list of keywords representing job titles or roles to search for on LinkedIn (e.g., ["software engineer"]).

How To Architect Our LinkedIn Jobs Scraper

To scrape LinkedIn properly, we need two separate scrapers, a search crawler, and a job scraper. From a high level, this is a pretty simple process.

  1. Our crawler performs a keyword search. When we get our search results, the crawler saves them all to a CSV file.
  2. After we've finished our crawl, our scraper needs to read the report from the crawler. It will then go through and collect extra data for each job we crawled.

If you perform a search for Software Engineer, the crawler will extract and save all the Software Engineer jobs from the search. Then, the scraper will lookup each individual job posting and generate a special report for each posting it looks up.

This might sound like a daunting task. To simplify it a little more, we need to break it into smaller pieces. Step by step, we need to define exactly how we want to build our crawler. Then, we need to go through the steps needed to build the scraper as well.

Here are the steps to building the crawler:

  1. Write a search results parser to interpret our data.
  2. Add pagination, this way, we get more results and finer control over them.
  3. Create some classes for data storage, and then use them to save our parsed results.
  4. Use ThreadPoolExecutor to add support for multithreading and therefore concurrency.
  5. Write a function for proxy integration and use it to bypass LinkedIn's anti-bot system.

Now, take a look at what we need to build the scraper.

  1. Write a parser to pull information from individual job postings.
  2. Give our scraper the ability to read a CSV file.
  3. Add another class for data storage and build the storage into our parsing function.
  4. Add ThreadPoolExecutor to scrape posting data concurrently.
  5. Use our proxy function from earlier to bypass anti-bots.

Understanding How To Scrape LinkedIn Jobs

Before we get started on coding, we need to get a better high level understanding of the tasks we need to accomplish.

  • We need to request pages properly.
  • We need to know which data to extract and how to extract it.
  • We also need to know how to control our pagination and our location.

In the sections below, we'll go through all of these steps in greater detail. This way, we'll be able to tell Selenium exactly what to do.


Step 1: How To Request LinkedIn Jobs Pages

Whether you're using a browser or a barebones HTTP client, we always need to start with a GET request.

  1. When you visit LinkedIn from your browser, the browser sends a GET to LinkedIn.
  2. LinkedIn then sends back an HTML response.
  3. The browser will then read the HTML and render the page for you to view.

With our scraper, we don't want to view the rendered page. We want to extract the important data from the HTML. This allows us to search tons of results with great speed and efficiency.

The following URL outlines the format we'll use to obtain our results:

https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer="

For the Software Engineer search we mentioned earlier, our URL looks like this:

https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location={formatted_locality}&original_referer=

If you look closer to our base URL (https://www.linkedin.com/jobs-guest/jobs/api), you might notice something interesting. We're actually making API requests, hence the endpoint, /api.

Something even more interesting, this API endpoint doesn't give us JSON or XML, it sends back straight HTML. In years of web development and scraping, LinkedIn is the only place I've ever seen something like this.

The screenshot below gives us a barebones HTML page without any styling whatsoever, but it is in fact a webpage.

Linkedin Job Search HTML

Once we're finished with our search, we'll scrape individual listings. Take a look at the shot below. This is the basic layout for any job posted on LinkedIn. We don't need to worry about the urls for these. We'll be extracting these urls during our scrape.

Linkedin Job page


Step 2: How To Extract Data From LinkedIn Jobs Results and Pages

We know what our target pages look like. Now we need to know where they're located inside the HTML Our search results hold a bunch of div elements.

  • Each one has a class name of base-search-card__info.
  • For individual job pages, we look for li elements with a class of description__job-criteria-item.

In the image below, we inspect one of our search results. As you can see, it's a div. Its class name is base-search-card__info. To extract this data, we need to find each div that matches this class.

Linkedin Job Search HTML Inspection

Here is the type of li element we want to scrape. Each li element is given the classname, description__job-criteria-item. So for these, we want to pull all li elements with this class.

Linkedin Job page HTML Inspection


Step 3: How To Control Pagination

When searching large amounts of data, pagination is imperative. Pagination allows us to get our results in batches. We'll have to add one, more param to our URL, &start={page_number*10}.

Our full URL for page 1 of the Software Engineer search would look like this:

  • We use page_number*10 because we begin counting at 0 and each request yields 10 results.
  • Page 0 (0 * 10) gives us results 1 through 10.
  • Page 1 gives us 11 through 20 and so on and so forth.

Look below to see how our fully formatted URL looks:

f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"

Step 4: Geolocated Data

Our API with the ScrapeOps Proxy Aggregator lets us control our geolocation for free. This API takes in all sorts of arguments, but the one we want is called country.

  • If we want to appear in the US, we can pass "country": "us" into the API.
  • If we want to appear in the UK, we'd pass "country": "uk".

You can find a full list of ScrapeOps supported countries here.


Setting Up Our LinkedIn Jobs Scraper Project

Let's get started. We need a new project folder. Then we need to install our dependencies. You can run the following commands to get set up.

Create a New Project Folder

mkdir linkedin-jobs-scraper

cd linkedin-jobs-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install selenium

Make sure you have Chromedriver installed! Nowadays, it comes prepackaged inside of Chrome for Testing. If you don't have Chrome for Testing, you can get it here.


Build A LinkedIn Jobs Search Crawler

Time to get started on our crawler. We'll use an iterative build process. The steps below outline everything we need to do in order to build the crawler.

  1. First, we're going to build a basic script with error handling, retry logic, and our basic parser.
  2. Next, we'll add pagination.
  3. Once we're getting proper result batches, we need to create a couple classes and use them for data storage.
  4. Then, we'll add concurrency to scrape multiple pages simultaneously.
  5. Finally, we'll use the ScrapeOps Proxy Aggregator to get past any roadblocks that might get in our way.

Step 1: Create Simple Search Data Parser

To start, we need to be able to parse a page. In the code below, we'll write a basic parsing function. This lays the foundation for everything else we build from here on out.

Pay close attention to our parsing function, scrape_search_results().

import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

options = webdriver.ChromeOptions()
options.add_argument("--headless")


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



def scrape_search_results(keyword, location, locality, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}"
tries = 0
success = False


while tries <= retries and not success:

driver = webdriver.Chrome(options=options)

try:
driver.get(url)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")

if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")

for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text

search_data = {
"name": company_name,
"job_title": job_title,
"url": job_link,
"location": location
}

print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

scrape_search_results(keyword, LOCATION, LOCALITY, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")
  • First, we create our Selenium options. Early in the script we set our options with options = webdriver.ChromeOptions(). Then we use options.add_argument("--headless") to set our browser to headless mode.
  • driver = webdriver.Chrome(options=options) launches Selenium with our custom options.
  • We use driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") to find all of our base result cards.
  • company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text finds our company_name.
  • Our job title is inside an h3, so we use div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text to find it.
  • Next, we find the parent of the div_card: div_card.find_element(By.XPATH, ".."). We use the XPATH and pass in .. to find the parent.
  • Our link is actually embedded in the parent element, so we extract it with parent.find_element(By.CSS_SELECTOR, "a").
  • We then pull the href from the link element with link.get_attribute("href").
  • Finally, div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text gets the job location from the card.

Step 2: Add Pagination

As we mentioned earlier in this article, pagination is pretty simple. We just append our URL. We add start={page_number*10} to the end of our URL. We need an additional function to scrape multiple pages. We'll call it start_scrape().

Our fully paginated urls are laid out in the snippet you see below.

    url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"

start_scrape() is in our next snippet. At the moment, it's just a simple for loop that parses pages using iteration. Later on, we'll make some improvements to it.

def start_scrape(keyword, pages, location, locality, retries=3):
for page in pages:
scrape_search_results(keyword, location, locality, page, retries=retries)

Take a look below and you'll see how everything fits together.

import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

options = webdriver.ChromeOptions()
options.add_argument("--headless")


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



def scrape_search_results(keyword, location, locality, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False


while tries <= retries and not success:

driver = webdriver.Chrome(options=options)

try:
driver.get(url)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")

if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")

for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text

search_data = {
"name": company_name,
"job_title": job_title,
"url": job_link,
"location": location
}

print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, retries=3):
for page in pages:
scrape_search_results(keyword, location, locality, page, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

start_scrape(keyword, PAGES, LOCATION, LOCALITY, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
  • start={page_number*10} controls our pagination.
  • With start_scrape(), we can parse a list of pages.

Step 3: Storing the Scraped Data

To store our data, we need to write some classes. Our first one is a dataclass called SearchData. The second one is our DataPipeline.

  • SearchData simply needs to represent individual search items.
  • DataPipeline needs to open a pipe to a CSV file and store SearchData objects inside our CSV.

Here is our SearchData. It holds the name, job_title, url and location that we find during the parse.

@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Once we've got our SearchData, we pass it into the DataPipeline you see below.

  • Our DataPipeline first checks to see if our CSV file exists.
    • If it exists, we append the file.
    • If the file doesn't exist, we create one.

This approach stops us from accidentally destroying important data. This class also filters out duplicates using the name attribute.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

Our newest iteration looks like this.

import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

options = webdriver.ChromeOptions()
options.add_argument("--headless")


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False


while tries <= retries and not success:

driver = webdriver.Chrome(options=options)

try:
driver.get(url)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")

if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")

for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, retries=3):
for page in pages:
scrape_search_results(keyword, location, locality, page, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • We use SearchData to represent individual results from our search results page.
  • DataPipeline is used to store these objects in a safe and efficient way.

Step 4: Adding Concurrency

When we add concurrency support, we use Python's builtin multithreading. To add multithreading, we're going to use ThreadPoolExecutor and we're going to remove our for loop from start_scrape().

ThreadPoolExecutor allows us to open a pool with max_threads. If we want to use 4 threads, we pass max_threads=4.

def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)

Our arguments to executor.map() go as follows:

  • scrape_search_results: the function we want to call on all these available threads.
  • All other arguments get passed in as arrays.
  • These arrays of arguments then get passed into the function we're calling on multiple threads.

Our full code now looks like this.

import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

options = webdriver.ChromeOptions()
options.add_argument("--headless")


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False


while tries <= retries and not success:

driver = webdriver.Chrome(options=options)

try:
driver.get(url)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")

if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")

for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

We can now crawl multiple pages simultaneously.


Step 5: Bypassing Anti-Bots

To get past anti-bots, we're going to write a special function. This function will take in a url and our ScrapeOps params and then wraps it all into one url with some simple string formatting and url encoding.

Take a look at get_scrapeops_url().

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

Let's unpack our payload.

  • "api_key": our ScrapeOps API key.
  • "url": the url we want to scrape.
  • "country": the country we want to appear in.

Our full production crawler is available below.

import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

options = webdriver.ChromeOptions()
options.add_argument("--headless")



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False


while tries <= retries and not success:

driver = webdriver.Chrome(options=options)

try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")

if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")

for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 6: Production Run

For our production run, we'll use 5 threads. We won't make use of all five, but we'll make full use of these 5 threads later on in our scraper.

If you're looking for different results, try changing any of the following.

  • MAX_RETRIES
  • MAX_THREADS
  • PAGES
  • LOCATION
  • LOCALITY
  • keyword_list
if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Take a look at our results.

Crawler Performance

As you can see, we crawled 3 pages in 33.694 seconds. This comes out to an average of 11.231 seconds per page.


Build A LinkedIn Jobs Scraper

Now, we're on to the second part of our project. We have our crawler that scrapes and saves search results.

Next, we need a scraper that reads those results. After reading those results, it needs to go through and scrape individual details about each job posting.


Step 1: Create Simple Job Data Parser

Just like earlier, we'll get started by writing our parsing function. This function will have error handling and retry logic just like before.

Take a look at process_posting(). Like before, pay close attention to our parsing logic.

def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(url, location=location)

job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")

job_data = {
"name": row["name"],
"seniority": seniority,
"position_type": position_type,
"job_function": job_function,
"industry": industry
}
print(job_data)
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
  • driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']") finds all the items from our criteria list.
  • The criteria list goes as follows:
    • job_criteria[0]: seniority level
    • job_criteria[1]: position type
    • job_criteria[2]: job function
    • job_criteria[3]: industry

Step 2: Loading URLs To Scrape

To use our new parsing function, we need to feed it a url. This url will come from the CSV file generated by our parser. We'll read our file and use a for loop to scrape details from every posting we found.

Here is our first iteration of process_results(). Later on, we'll rewrite it and add multithreading support.

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_posting(row, location, retries=retries)

In the full code below, we're now updated to perform a crawl, and then scrape individual job postings.

import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

options = webdriver.ChromeOptions()
options.add_argument("--headless")



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False


while tries <= retries and not success:

driver = webdriver.Chrome(options=options)

try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")

if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")

for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(url, location=location)

job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")

job_data = {
"name": row["name"],
"seniority": seniority,
"position_type": position_type,
"job_function": job_function,
"industry": industry
}
print(job_data)
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_posting(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)

Step 3: Storing the Scraped Data

We can store basically anything we want with our DataPipeline. We just need another dataclass. Take a look below at JobData. Just like our SearchData from earlier, we use it to represent the data we scraped from the page.

We'll pass this into our DataPipeline which will then pipe our data into a CSV file.

@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In our full code below, our parsing function now opens a DataPipeline. Then, instead of printing our parsed data, we create a JobData object out of it and then pass our JobData into the pipeline.

import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

options = webdriver.ChromeOptions()
options.add_argument("--headless")



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False


while tries <= retries and not success:

driver = webdriver.Chrome(options=options)

try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")

if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")

for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(url, location=location)

job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")

job_data = JobData(
name=row["name"],
seniority=seniority,
position_type=position_type,
job_function=job_function,
industry=industry
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_posting(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
  • JobData holds the data we pull from the page.
  • DataPipeline takes a JobData object and pipes it to a CSV file.

Step 4: Adding Concurrency

We're going to use ThreadPoolExecutor for concurrency just like we did earlier.

Take a look at our refactored version of process_results().

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_posting,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Look at our arguments to executor.map():

  • process_posting: the function we want to call on multiple threads.
  • All arguments to process_posting get passed in as arrays.

Step 5: Bypassing Anti-Bots

We're just about ready to run in production. However, there is one thing we still need to add, proxy support.

driver.get(get_scrapeops_url(url, location=location))

Below is the final code to our scraper.

import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

options = webdriver.ChromeOptions()
options.add_argument("--headless")



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False


while tries <= retries and not success:

driver = webdriver.Chrome(options=options)

try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)

div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")

if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")

for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text

search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)


def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(get_scrapeops_url(url, location=location))

job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")

job_data = JobData(
name=row["name"],
seniority=seniority,
position_type=position_type,
job_function=job_function,
industry=industry
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_posting,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Here, we'll run a full crawl and scrape. Once again, we'll set our PAGES to 3 and our MAX_THREADS to 5.

If you need a refresher on our main, you can see it again below.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Here are the results.

Scraper Performance text

If you remember, our 3 page crawl took 33.694 seconds. Our crawl spat out a CSV with 20 results. The full crawl and scrape took 47.565 seconds. 155.813 - 33.694 = 122.119 seconds. 122.119 seconds / 20 results = 6.106 seconds per result.

We're scraping pages almost twice as fast as it took to crawl them!


Scraping private data without special permission is pretty much always illegal. When we scrape LinkedIn jobs, we're not logging in and we're scraping publicly available data. You should do the same.

If you think your scraper is legally questionable, you need to consult an attorney.

In addition to any legal ramifications from scraping, we're subject to LinkedIn's terms of service and their robots.txt. Their terms are available here and their robots.txt is here.

As stated at the top of their robots.txt, crawling LinkedIn is explicitly prohibited. By scraping LinkedIn, you can have your account suspended, banned, or even deleted.


Conclusion

It's a bit more tactical than other scraping jobs, but it's entirely possible to scrape job postings from LinkedIn and you've now seen it with your own eyes.

By now, you should have a basic understanding of Selenium and how to parse pages with it. You also should have a solid understanding of how to iteratively build new features such as parsing, pagination, data storage, concurrency and proxy integration.

If you want to know more about the tech stack from this article, check out the links below!


More Python Web Scraping Guides

At ScrapeOps, we wrote the playbook on scraping with Selenium. Whether you're brand new, or an experienced dev, we've got something for you.

If you'd like to read more of our "How To Scrape" series, take a look at the links below.