How to Scrape Linkedin Jobs With Requests and BeautifulSoup
Since its founding, LinkedIn has been a go-to social network for all sorts of career professionals. Eventually, it became a great place for job postings and in 2016, it was even bought by Microsoft. Because of its widespread use, and the fact that it integrates social media with job postings, LinkedIn is a great place to look for a job.
Today, we're going to crawl LinkedIn's job postings and then scrape the individual pages for those listings. This is a great way to collect large amounts of data and get a larger picture of the market for certain jobs.
- TLDR: How to Scrape LinkedIn Jobs
- How To Architect Our Scraper
- Understanding How To Scrape LinkedIn Jobs
- Setting Up Our LinkedIn Jobs Scraper
- Build A LinkedIn Jobs Search Crawler
- Build A LinkedIn Jobs Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
TLDR - How to Scrape LinkedIn Jobs
Looking to scrape LinkedIn jobs? Look no further!
To use our prebuilt scraper:
- Make a new project folder with a
config.json
file. - Inside the config file, add you ScrapeOps API key:
{"api_key": "your-super-secret-api-key"}
. - Then copy and paste the code below into a Python file.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code != 200:
raise Exception(f"Failed Request, status code: {response.status_code}")
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
job_criteria = soup.find_all("li", class_="description__job-criteria-item")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")
job_data = JobData(
name=row["name"],
seniority=seniority,
position_type=position_type,
job_function=job_function,
industry=industry
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_posting,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Feel free to change any of the following to control your results:
MAX_RETRIES
: Defines the maximum number of times the script will attempt to retrieve a webpage if the initial request fails (e.g., due to network issues or rate limiting).MAX_THREADS
: Sets the maximum number of threads that the script will use concurrently during scraping.PAGES
: The number of pages of job listings to scrape for each keyword.LOCATION
: The country code or identifier for the region from which job listings should be scraped (e.g., "us" for the United States).LOCALITY
: The textual representation of the location where the jobs are being scraped (e.g., "United States").keyword_list
: A list of keywords representing job titles or roles to search for on LinkedIn (e.g., ["software engineer"]).
You can then run your scraper with python name_of_your_script.py
. You'll get a CSV named after the keyword you searched. Then, you'll get an individual CSV report on each job as well.
How To Architect Our LinkedIn Jobs Scraper
In order to scrape LinkedIn, we're going to build two different scrapers, a search crawler, and a job scraper. At the highest level, this process is relatively simple.
- First, our crawler runs a keyword search for jobs with a certain title, and then saves the results.
- Once we've got results, our scraper will then go through and scrape each individual job posting we find in the results.
Let's break this process down into smaller pieces. We'll start with defining our crawl from start to finish, and then we'll do our scrape.
Step by step, here is how we'll build our crawler:
- Write a search results parser to interpret our data.
- Add pagination, this way, we get more results and finer control over them.
- Create some classes for data storage, and then use them to save our parsed results.
- Use
ThreadPoolExecutor
to add support for multithreading and therefore concurrency. - Write a function for proxy integration and use it to bypass LinkedIn's anti-bot system.
Here are the steps we'll go through when building our scraper.
- Write a parser to pull information from individual job postings.
- Give our scraper the ability to read a CSV file.
- Add another class for data storage and build the storage into our parsing function.
- Add
ThreadPoolExecutor
to scrape posting data concurrently. - Use our proxy function from earlier to bypass anti-bots.
Understanding How To Scrape LinkedIn Jobs
We can't just plunge straight into coding. We need to look at how this is done from a user standpoint first. We need to understand how to request these pages and how to extract data from them. We also need to see how pagination works and we need to know how to control our location.
Step 1: How To Request LinkedIn Jobs Pages
Anytime you go to a website, it starts with a GET request.
- Our browser makes a GET to LinkedIn.
- Then, LinkedIn sends back an HTML page.
- Our browser reads the page and displays our results.
- Instead of displaying results, our goal is to write a program that reads the results and finds the relevant information.
- We save the information we want and get on with our day.
To GET a search from LinkedIn Job results, we'll use the following URL structure:
https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer="
If we wanted to search the US for Software Engineer jobs, our URL would look like this:
https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location={formatted_locality}&original_referer=
As you might have noticed from the URL, we're going to use their builtin API to fetch our jobs. Interestingly enough, this API doesn't give us JSON or XML, it sends back straight HTML. Take a look at our search results below.
We also need to make another GET when we scrape an individual job posting. If you look at this page, you can see some finer details such as seniority, job function, employment type, and industries.
Step 2: How To Extract Data From LinkedIn Jobs Results and Pages
Now that we know what these pages look like, we need to see exactly where to pull our information from.
- On the search results page, all of our information is connected to a
div
card with a class name,base-search-card__info
. - On a job page, our information comes as an
li
element with a class name,description__job-criteria-item
.
Take a look below it our base-search-card__info
.
In this next image, you'll see one of the li
items that we would extract.
Step 3: How To Control Pagination
We use pagination to control our search results. We need to add one parameter to our URL, &start={page_number*10}
.
Our full URL for page 1 of the Software Engineer search would look like this:
https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location=United+States&original_referer=&start=0
We use page_number*10
because we begin counting at 0 and each request yields 10 results. Page 0 (0 * 10) gives us results 1 through 10. Page 1 gives us 11 through 20 and so on and so forth.
Inside our Python code, the URL would look like this:
f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
Step 4: Geolocated Data
To control our geolocation, we'll be using the ScrapeOps Proxy API. This API can take in all sorts of arguments, but the one we use for this is called country
.
- If we want to appear in the US, we can pass
"country": "us"
into the API. - If we want to appear in the UK, we'd pass
"country": "uk"
.
You can find a full list of ScrapeOps supported countries here.
Setting Up Our LinkedIn Jobs Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir linkedin-jobs-scraper
cd linkedin-jobs-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A LinkedIn Jobs Search Crawler
We're now ready to build our crawler. We know what it needs to do and we'll implement it in a series of 5 steps.
- First, we're going to build a basic script with error handling, retry logic, and our basic parser.
- We'll add pagination.
- Create a couple classes and use them to implement data storage.
- Add concurrency to scrape multiple pages simultaneously.
- Integrate with the ScrapeOps Proxy API in order to get past even the most stringent of anti-bot protocols.
Step 1: Create Simple Search Data Parser
Time to begin. We're going to start with a basic script that sets up our basic structure and adds error handling, retry logic and our basic parsing function. This will lay the foundation for everything else we add later on.
Pay close attention to our parsing function, scrape_search_results()
.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, locality, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer="
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text
search_data = {
"name": company_name,
"job_title": job_title,
"url": job_link,
"location": location
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, LOCALITY)
logger.info(f"Crawl complete.")
- We use
soup.find_all("div", class_="base-search-card__info")
to find all of our base result cards. div_card.find("h4", class_="base-search-card__subtitle").text
finds ourcompany_name
.- Our job title is inside an
h3
, so we usediv_card.find("h3", class_="base-search-card__title").text
to find it. - Our link is actually embedded in the parent element, so we extract it with
div_card.parent.find("a")
. - We then pull the
href
from the link element withlink.get("href")
. - Finally,
div_card.find("span", class_="job-search-card__location").text
gets the job location from the card.
Step 2: Add Pagination
As mentioned earlier, adding pagination is very simple. We just need to add start={page_number*10}
to the end of our URL. We also need a function that allows us to scrape multiple pages, we'll call it start_scrape()
.
Our fully paginated urls are laid out in the snippet you see below.
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
start_scrape()
is in our next snippet. At the moment, it's just a simple for
loop that parses pages using iteration. Later on, we'll make some improvements to it.
def start_scrape(keyword, pages, location, locality, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page_number, retries=retries)
You can see how it all fits together in the full code below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, locality, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text
search_data = {
"name": company_name,
"job_title": job_title,
"url": job_link,
"location": location
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page_number, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
start_scrape(keyword, PAGES, LOCATION, LOCALITY, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
start={page_number*10}
gives us the ability to control pagination inside our url.start_scrape()
allows us to parse a list of pages.
Step 3: Storing the Scraped Data
In order to store our data, we need to write a couple of classes.
- Our first one is a
dataclass
calledSearchData
. - The second one is our
DataPipeline
.SearchData
simply needs to represent individual search items.
DataPipeline
needs to open a pipe to a CSV file and store SearchData
objects inside our CSV.
Here is our SearchData
. It holds the name
, job_title
, url
and location
that we find during the parse.
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Once we've got our SearchData
, it gets passed into the DataPipeline
you see below. The DataPipeline
first checks to see if our CSV file exists. If it exists, we append the file.
If the file doesn't exist, we create one. This approach stops us from accidentally destroying important data. This class also filters out duplicates using the name
attribute.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
Putting it all together, we get a script that looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, locality, page_number, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- We use
SearchData
to represent individual results from our search results page. DataPipeline
is used to store these objects in a safe and effficient way.
Step 4: Adding Concurrency
To add concurrency support, we're going to use multithreading.
To add multithreading, we're going to use ThreadPoolExecutor
and we're going to remove our for
loop from start_scrape()
.
ThreadPoolExecutor
allows us to open a pool with max_threads
. If we want to use 4 threads, we pass max_threads=4
.
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
Our arguments to executor.map()
go as follows:
scrape_search_results
: the function we want to call on all these available threads.- All other arguments get passed in as arrays.
- These arrays of arguments then get passed into the function we're calling on multiple threads.
Our full code now looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
We can now crawl multiple pages simultaneously.
Step 5: Bypassing Anti-Bots
To bypass anti-bots, we're going to write a simple function that takes a url
and a location
. Along with these, the function will handle some set parameters and spit out a ScrapeOps proxied URL.
Take a look at get_scrapeops_url()
.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
Let's unpack our payload
.
"api_key"
: our ScrapeOps API key."url"
: the url we want to scrape."country"
: the country we want to appear in.
Our full production crawler is available below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 6: Production Run
Time to run it in production. We're going to crawl 3 pages using 5 threads. If you're looking for different results, try changing any of the following.
MAX_RETRIES
: Defines the maximum number of times the script will attempt to retrieve a webpage if the initial request fails (e.g., due to network issues or rate limiting).MAX_THREADS
: Sets the maximum number of threads that the script will use concurrently during scraping.PAGES
: The number of pages of job listings to scrape for each keyword.LOCATION
: The country code or identifier for the region from which job listings should be scraped (e.g., "us" for the United States).LOCALITY
: The textual representation of the location where the jobs are being scraped (e.g., "United States").keyword_list
: A list of keywords representing job titles or roles to search for on LinkedIn (e.g., ["software engineer"]).
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Take a look at our results.
As you can see, we crawled 3 pages in 6.99 seconds. This comes out to an average of 2.33 secconds per page.
Build A LinkedIn Jobs Scraper
Now, it's time to build our scraper. Our scraper needs to be able to read a CSV file. Then, it needs to parse each page from the file. It needs to store the parsed data. Once it can do the things we just mentioned, we need to go through and add concurrency and proxy support.
Step 1: Create Simple Job Data Parser
Like we did earlier, we'll start by writing a simple parsing function. This function will have error handling and retry logic just like before. Take a look at process_posting()
. Like before, pay close attention to our parsing logic.
def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code != 200:
raise Exception(f"Failed Request, status code: {response.status_code}")
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
job_criteria = soup.find_all("li", class_="description__job-criteria-item")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")
job_data = {
"name": row["name"],
"seniority": seniority,
"position_type": position_type,
"job_function": job_function,
"industry": industry
}
print(job_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
soup.find_all("li", class_="description__job-criteria-item")
finds all of our criteria pieces.- The criteria list goes as follows:
job_criteria[0]
: senority leveljob_criteria[1]
: position typejob_criteria[2]
: job functionjob_criteria[3]
: industry
Step 2: Loading URLs To Scrape
Without a CSV file to read, our parsing function is pretty useless. We're going to write a function that reads a CSV file and uses a for
loop to call process_posting()
on each row from the file.
Here is our first iteration of process_results()
. Later on, we'll rewrite it and add multithreading support.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_posting(row, location, retries=retries)
In the full code below, we're now updated to perform a crawl, and then scrape individual job postings.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code != 200:
raise Exception(f"Failed Request, status code: {response.status_code}")
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
job_criteria = soup.find_all("li", class_="description__job-criteria-item")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")
job_data = {
"name": row["name"],
"seniority": seniority,
"position_type": position_type,
"job_function": job_function,
"industry": industry
}
print(job_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_posting(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
We already have a DataPipeline
. Storing our data will be very easy at this point. We just need another dataclass
. Take a look below at JobData
.
Just like our SearchData
from earlier, we use it to represent the data we scraped from the page.
@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In our full code below, our parsing function now opens a DataPipeline
. Then, instead of printing our parsed data, we create a JobData
object out of it and then pass our JobData
into the pipeline.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url, location=location)
try:
if response.status_code != 200:
raise Exception(f"Failed Request, status code: {response.status_code}")
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
job_criteria = soup.find_all("li", class_="description__job-criteria-item")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")
job_data = JobData(
name=row["name"],
seniority=seniority,
position_type=position_type,
job_function=job_function,
industry=industry
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_posting(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, retries=MAX_RETRIES)
JobData
holds the data we pull from the page.DataPipeline
takes aJobData
object and pipes it to a CSV file.
Step 4: Adding Concurrency
For concurrency support, we're going to use ThreadPoolExecutor
like we did earlier.
Take a look at our refactored version of process_results()
.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_posting,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Look at our arguments to executor.map()
:
process_posting
: the function we want to call on multiple threads.- All arguments to
process_posting
get passed in as arrays.
Step 5: Bypassing Anti-Bots
To bypass anti-bots using our scraper, we just need to reuse a function we wrote at the beginning of our crawler.
We'll change one line of our parsing function, the GET request.
response = requests.get(get_scrapeops_url(url, location=location))
Our full production code is available below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
job_title: str = ""
url: str = ""
location: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class JobData:
name: str = ""
seniority: str = ""
position_type: str = ""
job_function: str = ""
industry: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
formatted_locality = locality.replace(" ", "+")
url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div", class_="base-search-card__info")
for div_card in div_cards:
company_name = div_card.find("h4", class_="base-search-card__subtitle").text
job_title = div_card.find("h3", class_="base-search-card__title").text
link = div_card.parent.find("a")
job_link = link.get("href")
location = div_card.find("span", class_="job-search-card__location").text
search_data = SearchData(
name=company_name,
job_title=job_title,
url=job_link,
location=location
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
[locality] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_posting(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code != 200:
raise Exception(f"Failed Request, status code: {response.status_code}")
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
job_criteria = soup.find_all("li", class_="description__job-criteria-item")
seniority = job_criteria[0].text.replace("Seniority level", "")
position_type = job_criteria[1].text.replace("Employment type", "")
job_function = job_criteria[2].text.replace("Job function", "")
industry = job_criteria[3].text.replace("Industries", "")
job_data = JobData(
name=row["name"],
seniority=seniority,
position_type=position_type,
job_function=job_function,
industry=industry
)
job_pipeline.add_data(job_data)
job_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_posting,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 3
LOCATION = "us"
LOCALITY = "United States"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["software engineer"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv"