How to Scrape Linkedin Jobs With Selenium
Since 2003, LinkedIn, LinkedIn has been a one-stop shop for all sorts of career opportunities. From job postings to professional networking, LinkedIn pretty much has it all. LinkedIn has been built specifically to stop scrapers, but if you know what you're doing, you can get tons of aggregate data. This data is extremely valuable.
Today, we'll go through the process of scraping LinkedIn jobs from start to finish.
- TLDR: How to Scrape LinkedIn Jobs
- How To Architect Our Scraper
- Understanding How To Scrape LinkedIn Jobs
- Setting Up Our LinkedIn Jobs Scraper
- Build A LinkedIn Jobs Search Crawler
- Build A LinkedIn Jobs Scraper
- Legal and Ethical Considerations
- Conclusion
- More Python Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape LinkedIn Jobs
If you need a scraper but don't have time to read, look no further! You can use our prebuilt scraper.
- Make a new project folder and add a
config.json
file. - Inside of your new config file, add you ScrapeOps API key:
{"api_key": "your-super-secret-api-key"}
. - Then copy and paste the code below into a Python file.
python name_of_your_script.py
is the command you'll use to run the scraper.
Once the scraper has finished running, you'll get a CSV named after your search. This one will contain all of your search data.
You'll also get an individual CSV file for each of the listings from the CSV. These individual files contain more detailed information about each job posting.
import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
options = webdriver.ChromeOptions()
options.add_argument("--headless")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")
if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")
for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(get_scrapeops_url(url, location=location))
job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")
job_data = JobData(
name=row["name"],
seniority=seniority,
position_type=position_type,
job_function=job_function,
industry=industry
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_posting,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
You can change any of the following from main
to fine-tune your results:
MAX_RETRIES
: Defines the maximum number of times the script will attempt to retrieve a webpage if the initial request fails (e.g., due to network issues or rate limiting).MAX_THREADS
: Sets the maximum number of threads that the script will use concurrently during scraping.PAGES
: The number of pages of job listings to scrape for each keyword.LOCATION
: The country code or identifier for the region from which job listings should be scraped (e.g., "us" for the United States).LOCALITY
: The textual representation of the location where the jobs are being scraped (e.g., "United States").keyword_list
: A list of keywords representing job titles or roles to search for on LinkedIn (e.g., ["software engineer"]).
How To Architect Our LinkedIn Jobs Scraper
To scrape LinkedIn properly, we need two separate scrapers, a search crawler, and a job scraper. From a high level, this is a pretty simple process.
- Our crawler performs a keyword search. When we get our search results, the crawler saves them all to a CSV file.
- After we've finished our crawl, our scraper needs to read the report from the crawler. It will then go through and collect extra data for each job we crawled.
If you perform a search for Software Engineer, the crawler will extract and save all the Software Engineer jobs from the search. Then, the scraper will lookup each individual job posting and generate a special report for each posting it looks up.
This might sound like a daunting task. To simplify it a little more, we need to break it into smaller pieces. Step by step, we need to define exactly how we want to build our crawler. Then, we need to go through the steps needed to build the scraper as well.
Here are the steps to building the crawler:
- Write a search results parser to interpret our data.
- Add pagination, this way, we get more results and finer control over them.
- Create some classes for data storage, and then use them to save our parsed results.
- Use
ThreadPoolExecutor
to add support for multithreading and therefore concurrency. - Write a function for proxy integration and use it to bypass LinkedIn's anti-bot system.
Now, take a look at what we need to build the scraper.
- Write a parser to pull information from individual job postings.
- Give our scraper the ability to read a CSV file.
- Add another class for data storage and build the storage into our parsing function.
- Add
ThreadPoolExecutor
to scrape posting data concurrently. - Use our proxy function from earlier to bypass anti-bots.
Understanding How To Scrape LinkedIn Jobs
Before we get started on coding, we need to get a better high level understanding of the tasks we need to accomplish.
- We need to request pages properly.
- We need to know which data to extract and how to extract it.
- We also need to know how to control our pagination and our location.
In the sections below, we'll go through all of these steps in greater detail. This way, we'll be able to tell Selenium exactly what to do.
Step 1: How To Request LinkedIn Jobs Pages
Whether you're using a browser or a barebones HTTP client, we always need to start with a GET request.
- When you visit LinkedIn from your browser, the browser sends a GET to LinkedIn.
- LinkedIn then sends back an HTML response.
- The browser will then read the HTML and render the page for you to view.
With our scraper, we don't want to view the rendered page. We want to extract the important data from the HTML. This allows us to search tons of results with great speed and efficiency.
The following URL outlines the format we'll use to obtain our results:
https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer="
For the Software Engineer search we mentioned earlier, our URL looks like this:
https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location={formatted_locality}&original_referer=
If you look closer to our base URL (https://www.linkedin.com/jobs-guest/jobs/api
), you might notice something interesting. We're actually making API requests, hence the endpoint, /api
.
Something even more interesting, this API endpoint doesn't give us JSON or XML, it sends back straight HTML. In years of web development and scraping, LinkedIn is the only place I've ever seen something like this.
The screenshot below gives us a barebones HTML page without any styling whatsoever, but it is in fact a webpage.
Once we're finished with our search, we'll scrape individual listings. Take a look at the shot below. This is the basic layout for any job posted on LinkedIn. We don't need to worry about the urls for these. We'll be extracting these urls during our scrape.
Step 2: How To Extract Data From LinkedIn Jobs Results and Pages
We know what our target pages look like. Now we need to know where they're located inside the HTML Our search results hold a bunch of div
elements.
- Each one has a class name of
base-search-card__info
. - For individual job pages, we look for
li
elements with a class ofdescription__job-criteria-item
.
In the image below, we inspect one of our search results. As you can see, it's a div
. Its class name is base-search-card__info
. To extract this data, we need to find each div
that matches this class.
Here is the type of li
element we want to scrape. Each li
element is given the classname, description__job-criteria-item
. So for these, we want to pull all li
elements with this class.
Step 3: How To Control Pagination
When searching large amounts of data, pagination is imperative. Pagination allows us to get our results in batches. We'll have to add one, more param to our URL, &start={page_number*10}
.
Our full URL for page 1 of the Software Engineer search would look like this:
- We use
page_number*10
because we begin counting at 0 and each request yields 10 results. - Page 0 (0 * 10) gives us results 1 through 10.
- Page 1 gives us 11 through 20 and so on and so forth.
Look below to see how our fully formatted URL looks:
f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
Step 4: Geolocated Data
Our API with the ScrapeOps Proxy Aggregator lets us control our geolocation for free. This API takes in all sorts of arguments, but the one we want is called country
.
- If we want to appear in the US, we can pass
"country": "us"
into the API. - If we want to appear in the UK, we'd pass
"country": "uk"
.
You can find a full list of ScrapeOps supported countries here.
Setting Up Our LinkedIn Jobs Scraper Project
Let's get started. We need a new project folder. Then we need to install our dependencies. You can run the following commands to get set up.
Create a New Project Folder
mkdir linkedin-jobs-scraper
cd linkedin-jobs-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install selenium
Make sure you have Chromedriver installed! Nowadays, it comes prepackaged inside of Chrome for Testing. If you don't have Chrome for Testing, you can get it here.
Build A LinkedIn Jobs Search Crawler
Time to get started on our crawler. We'll use an iterative build process. The steps below outline everything we need to do in order to build the crawler.
- First, we're going to build a basic script with error handling, retry logic, and our basic parser.
- Next, we'll add pagination.
- Once we're getting proper result batches, we need to create a couple classes and use them for data storage.
- Then, we'll add concurrency to scrape multiple pages simultaneously.
- Finally, we'll use the ScrapeOps Proxy Aggregator to get past any roadblocks that might get in our way.
Step 1: Create Simple Search Data Parser
To start, we need to be able to parse a page. In the code below, we'll write a basic parsing function. This lays the foundation for everything else we build from here on out.
Pay close attention to our parsing function, scrape_search_results()
.
import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
options = webdriver.ChromeOptions()
options.add_argument("--headless")
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, locality, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")
if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")
for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text
search_data = {
"name": company_name,
"job_title": job_title,
"url": job_link,
"location": location
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, LOCATION, LOCALITY, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
- First, we create our Selenium options. Early in the script we set our options with
options = webdriver.ChromeOptions()
. Then we useoptions.add_argument("--headless")
to set our browser to headless mode. driver = webdriver.Chrome(options=options)
launches Selenium with our custom options.- We use
driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")
to find all of our base result cards. company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
finds ourcompany_name
.- Our job title is inside an
h3
, so we usediv_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
to find it. - Next, we find the
parent
of thediv_card
:div_card.find_element(By.XPATH, "..")
. We use theXPATH
and pass in..
to find theparent
. - Our link is actually embedded in the parent element, so we extract it with
parent.find_element(By.CSS_SELECTOR, "a")
. - We then pull the
href
from the link element withlink.get_attribute("href")
. - Finally,
div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text
gets the job location from the card.
Step 2: Add Pagination
As we mentioned earlier in this article, pagination is pretty simple. We just append our URL. We add start={page_number*10}
to the end of our URL. We need an additional function to scrape multiple pages. We'll call it start_scrape()
.
Our fully paginated urls are laid out in the snippet you see below.
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
start_scrape()
is in our next snippet. At the moment, it's just a simple for
loop that parses pages using iteration. Later on, we'll make some improvements to it.
def start_scrape(keyword, pages, location, locality, retries=3):
for page in pages:
scrape_search_results(keyword, location, locality, page, retries=retries)
Take a look below and you'll see how everything fits together.
import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
options = webdriver.ChromeOptions()
options.add_argument("--headless")
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, locality, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")
if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")
for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text
search_data = {
"name": company_name,
"job_title": job_title,
"url": job_link,
"location": location
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, retries=3):
for page in pages:
scrape_search_results(keyword, location, locality, page, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
start={page_number*10}
controls our pagination.- With
start_scrape()
, we can parse a list of pages.
Step 3: Storing the Scraped Data
To store our data, we need to write some classes. Our first one is a dataclass
called SearchData
. The second one is our DataPipeline
.
SearchData
simply needs to represent individual search items.DataPipeline
needs to open a pipe to a CSV file and storeSearchData
objects inside our CSV.
Here is our SearchData
. It holds the name
, job_title
, url
and location
that we find during the parse.
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Once we've got our SearchData
, we pass it into the DataPipeline
you see below.
- Our
DataPipeline
first checks to see if our CSV file exists.- If it exists, we append the file.
- If the file doesn't exist, we create one.
This approach stops us from accidentally destroying important data. This class also filters out duplicates using the name
attribute.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
Our newest iteration looks like this.
import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
options = webdriver.ChromeOptions()
options.add_argument("--headless")
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")
if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")
for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, retries=3):
for page in pages:
scrape_search_results(keyword, location, locality, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- We use
SearchData
to represent individual results from our search results page. DataPipeline
is used to store these objects in a safe and efficient way.
Step 4: Adding Concurrency
When we add concurrency support, we use Python's builtin multithreading. To add multithreading, we're going to use ThreadPoolExecutor
and we're going to remove our for
loop from start_scrape()
.
ThreadPoolExecutor
allows us to open a pool with max_threads
. If we want to use 4 threads, we pass max_threads=4
.
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
Our arguments to executor.map()
go as follows:
scrape_search_results
: the function we want to call on all these available threads.- All other arguments get passed in as arrays.
- These arrays of arguments then get passed into the function we're calling on multiple threads.
Our full code now looks like this.
import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
options = webdriver.ChromeOptions()
options.add_argument("--headless")
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")
if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")
for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
We can now crawl multiple pages simultaneously.
Step 5: Bypassing Anti-Bots
To get past anti-bots, we're going to write a special function. This function will take in a url and our ScrapeOps params and then wraps it all into one url with some simple string formatting and url encoding.
Take a look at get_scrapeops_url()
.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
Let's unpack our payload
.
"api_key"
: our ScrapeOps API key."url"
: the url we want to scrape."country"
: the country we want to appear in.
Our full production crawler is available below.
import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
options = webdriver.ChromeOptions()
options.add_argument("--headless")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")
if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")
for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
For our production run, we'll use 5 threads. We won't make use of all five, but we'll make full use of these 5 threads later on in our scraper.
If you're looking for different results, try changing any of the following.
MAX_RETRIES
MAX_THREADS
PAGES
LOCATION
LOCALITY
keyword_list
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Take a look at our results.
As you can see, we crawled 3 pages in 33.694 seconds. This comes out to an average of 11.231 seconds per page.
Build A LinkedIn Jobs Scraper
Now, we're on to the second part of our project. We have our crawler that scrapes and saves search results.
Next, we need a scraper that reads those results. After reading those results, it needs to go through and scrape individual details about each job posting.
Step 1: Create Simple Job Data Parser
Just like earlier, we'll get started by writing our parsing function. This function will have error handling and retry logic just like before.
Take a look at process_posting()
. Like before, pay close attention to our parsing logic.
def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(url, location=location)
job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")
job_data = {
"name": row["name"],
"seniority": seniority,
"position_type": position_type,
"job_function": job_function,
"industry": industry
}
print(job_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']")
finds all the items from our criteria list.- The criteria list goes as follows:
job_criteria[0]
: seniority leveljob_criteria[1]
: position typejob_criteria[2]
: job functionjob_criteria[3]
: industry
Step 2: Loading URLs To Scrape
To use our new parsing function, we need to feed it a url. This url will come from the CSV file generated by our parser. We'll read our file and use a for
loop to scrape details from every posting we found.
Here is our first iteration of process_results()
. Later on, we'll rewrite it and add multithreading support.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_posting(row, location, retries=retries)
In the full code below, we're now updated to perform a crawl, and then scrape individual job postings.
import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
options = webdriver.ChromeOptions()
options.add_argument("--headless")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")
if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")
for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(url, location=location)
job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")
job_data = {
"name": row["name"],
"seniority": seniority,
"position_type": position_type,
"job_function": job_function,
"industry": industry
}
print(job_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_posting(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
We can store basically anything we want with our DataPipeline
. We just need another dataclass
. Take a look below at JobData
. Just like our SearchData
from earlier, we use it to represent the data we scraped from the page.
We'll pass this into our DataPipeline
which will then pipe our data into a CSV file.
@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In our full code below, our parsing function now opens a DataPipeline
. Then, instead of printing our parsed data, we create a JobData
object out of it and then pass our JobData
into the pipeline.
import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
options = webdriver.ChromeOptions()
options.add_argument("--headless")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")
if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")
for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(url, location=location)
job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")
job_data = JobData(
name=row["name"],
seniority=seniority,
position_type=position_type,
job_function=job_function,
industry=industry
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_posting(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
JobData
holds the data we pull from the page.DataPipeline
takes aJobData
object and pipes it to a CSV file.
Step 4: Adding Concurrency
We're going to use ThreadPoolExecutor
for concurrency just like we did earlier.
Take a look at our refactored version of process_results()
.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_posting,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Look at our arguments to executor.map()
:
process_posting
: the function we want to call on multiple threads.- All arguments to
process_posting
get passed in as arrays.
Step 5: Bypassing Anti-Bots
We're just about ready to run in production. However, there is one thing we still need to add, proxy support.
driver.get(get_scrapeops_url(url, location=location))
Below is the final code to our scraper.
import os
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
import logging
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
options = webdriver.ChromeOptions()
options.add_argument("--headless")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")
if not div_cards:
driver.save_screenshot("debug.png")
raise Exception("Page did not load correctly, please check debug.png")
for div_card in div_cards:
company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
print("company name", company_name)
job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
parent = div_card.find_element(By.XPATH, "..")
link = parent.find_element(By.CSS_SELECTOR, "a")
job_link = link.get_attribute("href")
location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=options)
try:
driver.get(get_scrapeops_url(url, location=location))
job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")
job_data = JobData(
name=row["name"],
seniority=seniority,
position_type=position_type,
job_function=job_function,
industry=industry
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_posting,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Here, we'll run a full crawl and scrape. Once again, we'll set our PAGES
to 3 and our MAX_THREADS
to 5.
If you need a refresher on our main
, you can see it again below.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Here are the results.
If you remember, our 3 page crawl took 33.694 seconds. Our crawl spat out a CSV with 20 results. The full crawl and scrape took 47.565 seconds. 155.813 - 33.694 = 122.119 seconds. 122.119 seconds / 20 results = 6.106 seconds per result.
We're scraping pages almost twice as fast as it took to crawl them!
Legal and Ethical Considerations
Scraping private data without special permission is pretty much always illegal. When we scrape LinkedIn jobs, we're not logging in and we're scraping publicly available data. You should do the same.
If you think your scraper is legally questionable, you need to consult an attorney.
In addition to any legal ramifications from scraping, we're subject to LinkedIn's terms of service and their robots.txt
. Their terms are available here and their robots.txt
is here.
As stated at the top of their robots.txt
, crawling LinkedIn is explicitly prohibited. By scraping LinkedIn, you can have your account suspended, banned, or even deleted.
Conclusion
It's a bit more tactical than other scraping jobs, but it's entirely possible to scrape job postings from LinkedIn and you've now seen it with your own eyes.
By now, you should have a basic understanding of Selenium and how to parse pages with it. You also should have a solid understanding of how to iteratively build new features such as parsing, pagination, data storage, concurrency and proxy integration.
If you want to know more about the tech stack from this article, check out the links below!
More Python Web Scraping Guides
At ScrapeOps, we wrote the playbook on scraping with Selenium. Whether you're brand new, or an experienced dev, we've got something for you.
If you'd like to read more of our "How To Scrape" series, take a look at the links below.