Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file.{"api_key": "your-super-secret-api-key"}
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code != 200: raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") job_criteria = soup.find_all("li", class_="description__job-criteria-item") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = JobData( name=row["name"], seniority=seniority, position_type=position_type, job_function=job_function, industry=industry ) job_pipeline.add_data(job_data) job_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_posting, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: Defines the maximum number of times the script will attempt to retrieve a webpage if the initial request fails (e.g., due to network issues or rate limiting).MAX_THREADS
: Sets the maximum number of threads that the script will use concurrently during scraping.PAGES
: The number of pages of job listings to scrape for each keyword.LOCATION
: The country code or identifier for the region from which job listings should be scraped (e.g., "us" for the United States).LOCALITY
: The textual representation of the location where the jobs are being scraped (e.g., "United States").keyword_list
: A list of keywords representing job titles or roles to search for on LinkedIn (e.g., ["software engineer"]).python name_of_your_script.py
. You'll get a CSV named after the keyword you searched. Then, you'll get an individual CSV report on each job as well.ThreadPoolExecutor
to add support for multithreading and therefore concurrency.ThreadPoolExecutor
to scrape posting data concurrently.https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer="
https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location={formatted_locality}&original_referer=
div
card with a class name, base-search-card__info
.li
element with a class name, description__job-criteria-item
.base-search-card__info
.li
items that we would extract.&start={page_number*10}
.Our full URL for page 1 of the Software Engineer search would look like this:https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location=United+States&original_referer=&start=0
page_number*10
because we begin counting at 0 and each request yields 10 results. Page 0 (0 * 10) gives us results 1 through 10. Page 1 gives us 11 through 20 and so on and so forth.Inside our Python code, the URL would look like this:f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
country
."country": "us"
into the API."country": "uk"
.mkdir linkedin-jobs-scraper cd linkedin-jobs-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
scrape_search_results()
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = { "name": company_name, "job_title": job_title, "url": job_link, "location": location } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: scrape_search_results(keyword, LOCATION, LOCALITY) logger.info(f"Crawl complete.")
soup.find_all("div", class_="base-search-card__info")
to find all of our base result cards.div_card.find("h4", class_="base-search-card__subtitle").text
finds our company_name
.h3
, so we use div_card.find("h3", class_="base-search-card__title").text
to find it.div_card.parent.find("a")
.href
from the link element with link.get("href")
.div_card.find("span", class_="job-search-card__location").text
gets the job location from the card.start={page_number*10}
to the end of our URL. We also need a function that allows us to scrape multiple pages, we'll call it start_scrape()
.Our fully paginated urls are laid out in the snippet you see below.url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
start_scrape()
is in our next snippet. At the moment, it's just a simple for
loop that parses pages using iteration. Later on, we'll make some improvements to it.def start_scrape(keyword, pages, location, locality, retries=3): for page in range(pages): scrape_search_results(keyword, location, locality, page_number, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = { "name": company_name, "job_title": job_title, "url": job_link, "location": location } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, retries=3): for page in range(pages): scrape_search_results(keyword, location, locality, page_number, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: start_scrape(keyword, PAGES, LOCATION, LOCALITY, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
start={page_number*10}
gives us the ability to control pagination inside our url.start_scrape()
allows us to parse a list of pages.dataclass
called SearchData
.DataPipeline
. SearchData
simply needs to represent individual search items.DataPipeline
needs to open a pipe to a CSV file and store SearchData
objects inside our CSV.Here is our SearchData
. It holds the name
, job_title
, url
and location
that we find during the parse.@dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
SearchData
, it gets passed into the DataPipeline
you see below. The DataPipeline
first checks to see if our CSV file exists. If it exists, we append the file.If the file doesn't exist, we create one. This approach stops us from accidentally destroying important data. This class also filters out duplicates using the name
attribute.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, locality, page_number, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData
to represent individual results from our search results page.DataPipeline
is used to store these objects in a safe and effficient way.ThreadPoolExecutor
and we're going to remove our for
loop from start_scrape()
.ThreadPoolExecutor
allows us to open a pool with max_threads
. If we want to use 4 threads, we pass max_threads=4
.def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map()
go as follows:scrape_search_results
: the function we want to call on all these available threads.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
url
and a location
. Along with these, the function will handle some set parameters and spit out a ScrapeOps proxied URL.Take a look at get_scrapeops_url()
.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
payload
."api_key"
: our ScrapeOps API key."url"
: the url we want to scrape."country"
: the country we want to appear in.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
MAX_RETRIES
: Defines the maximum number of times the script will attempt to retrieve a webpage if the initial request fails (e.g., due to network issues or rate limiting).MAX_THREADS
: Sets the maximum number of threads that the script will use concurrently during scraping.PAGES
: The number of pages of job listings to scrape for each keyword.LOCATION
: The country code or identifier for the region from which job listings should be scraped (e.g., "us" for the United States).LOCALITY
: The textual representation of the location where the jobs are being scraped (e.g., "United States").keyword_list
: A list of keywords representing job titles or roles to search for on LinkedIn (e.g., ["software engineer"]).if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
process_posting()
. Like before, pay close attention to our parsing logic.def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code != 200: raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") job_criteria = soup.find_all("li", class_="description__job-criteria-item") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = { "name": row["name"], "seniority": seniority, "position_type": position_type, "job_function": job_function, "industry": industry } print(job_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
soup.find_all("li", class_="description__job-criteria-item")
finds all of our criteria pieces.job_criteria[0]
: senority leveljob_criteria[1]
: position typejob_criteria[2]
: job functionjob_criteria[3]
: industryfor
loop to call process_posting()
on each row from the file.Here is our first iteration of process_results()
. Later on, we'll rewrite it and add multithreading support.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_posting(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code != 200: raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") job_criteria = soup.find_all("li", class_="description__job-criteria-item") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = { "name": row["name"], "seniority": seniority, "position_type": position_type, "job_function": job_function, "industry": industry } print(job_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_posting(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline
. Storing our data will be very easy at this point. We just need another dataclass
. Take a look below at JobData
.Just like our SearchData
from earlier, we use it to represent the data we scraped from the page.@dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. Then, instead of printing our parsed data, we create a JobData
object out of it and then pass our JobData
into the pipeline.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code != 200: raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") job_criteria = soup.find_all("li", class_="description__job-criteria-item") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = JobData( name=row["name"], seniority=seniority, position_type=position_type, job_function=job_function, industry=industry ) job_pipeline.add_data(job_data) job_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_posting(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
JobData
holds the data we pull from the page.DataPipeline
takes a JobData
object and pipes it to a CSV file.ThreadPoolExecutor
like we did earlier.Take a look at our refactored version of process_results()
.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_posting, reader, [location] * len(reader), [retries] * len(reader) )
executor.map()
:process_posting
: the function we want to call on multiple threads.process_posting
get passed in as arrays.response = requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code != 200: raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") job_criteria = soup.find_all("li", class_="description__job-criteria-item") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = JobData( name=row["name"], seniority=seniority, position_type=position_type, job_function=job_function, industry=industry ) job_pipeline.add_data(job_data) job_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_posting, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
, you can see it again below.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
. Their terms are available here and their robots.txt
is here.As stated at the top of their robots.txt
, crawling LinkedIn is explicitly prohibited. By scraping LinkedIn, you can have your account suspended, banned, or even deleted.Always ensure compliance with LinkedIn's policies and consider using official APIs or getting explicit permission for large-scale data extraction.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file.{"api_key": "your-super-secret-api-key"}
.python name_of_your_script.py
is the command you'll use to run the scraper.import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(get_scrapeops_url(url, location=location)) job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = JobData( name=row["name"], seniority=seniority, position_type=position_type, job_function=job_function, industry=industry ) job_pipeline.add_data(job_data) job_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_posting, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
to fine-tune your results:MAX_RETRIES
: Defines the maximum number of times the script will attempt to retrieve a webpage if the initial request fails (e.g., due to network issues or rate limiting).MAX_THREADS
: Sets the maximum number of threads that the script will use concurrently during scraping.PAGES
: The number of pages of job listings to scrape for each keyword.LOCATION
: The country code or identifier for the region from which job listings should be scraped (e.g., "us" for the United States).LOCALITY
: The textual representation of the location where the jobs are being scraped (e.g., "United States").keyword_list
: A list of keywords representing job titles or roles to search for on LinkedIn (e.g., ["software engineer"]).ThreadPoolExecutor
to add support for multithreading and therefore concurrency.ThreadPoolExecutor
to scrape posting data concurrently.https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer="
https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location={formatted_locality}&original_referer=
https://www.linkedin.com/jobs-guest/jobs/api
), you might notice something interesting. We're actually making API requests, hence the endpoint, /api
.Something even more interesting, this API endpoint doesn't give us JSON or XML, it sends back straight HTML. In years of web development and scraping, LinkedIn is the only place I've ever seen something like this.The screenshot below gives us a barebones HTML page without any styling whatsoever, but it is in fact a webpage.div
elements.base-search-card__info
.li
elements with a class of description__job-criteria-item
.div
. Its class name is base-search-card__info
. To extract this data, we need to find each div
that matches this class.li
element we want to scrape. Each li
element is given the classname, description__job-criteria-item
. So for these, we want to pull all li
elements with this class.&start={page_number*10}
.Our full URL for page 1 of the Software Engineer search would look like this:
page_number*10
because we begin counting at 0 and each request yields 10 results.f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
country
."country": "us"
into the API."country": "uk"
.mkdir linkedin-jobs-scraper cd linkedin-jobs-scraper
python -m venv venv
source venv/bin/activate
pip install selenium
scrape_search_results()
.import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = { "name": company_name, "job_title": job_title, "url": job_link, "location": location } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, LOCALITY, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
options = webdriver.ChromeOptions()
. Then we use options.add_argument("--headless")
to set our browser to headless mode.driver = webdriver.Chrome(options=options)
launches Selenium with our custom options.driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']")
to find all of our base result cards.company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text
finds our company_name
.h3
, so we use div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text
to find it.parent
of the div_card
: div_card.find_element(By.XPATH, "..")
. We use the XPATH
and pass in ..
to find the parent
.parent.find_element(By.CSS_SELECTOR, "a")
.href
from the link element with link.get_attribute("href")
.div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text
gets the job location from the card.start={page_number*10}
to the end of our URL. We need an additional function to scrape multiple pages. We'll call it start_scrape()
.Our fully paginated urls are laid out in the snippet you see below.url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"
start_scrape()
is in our next snippet. At the moment, it's just a simple for
loop that parses pages using iteration. Later on, we'll make some improvements to it.def start_scrape(keyword, pages, location, locality, retries=3): for page in pages: scrape_search_results(keyword, location, locality, page, retries=retries)
import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = { "name": company_name, "job_title": job_title, "url": job_link, "location": location } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, retries=3): for page in pages: scrape_search_results(keyword, location, locality, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, LOCALITY, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
start={page_number*10}
controls our pagination.start_scrape()
, we can parse a list of pages.dataclass
called SearchData
. The second one is our DataPipeline
.SearchData
simply needs to represent individual search items.DataPipeline
needs to open a pipe to a CSV file and store SearchData
objects inside our CSV.SearchData
. It holds the name
, job_title
, url
and location
that we find during the parse.@dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
SearchData
, we pass it into the DataPipeline
you see below.DataPipeline
first checks to see if our CSV file exists.
name
attribute.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, retries=3): for page in pages: scrape_search_results(keyword, location, locality, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData
to represent individual results from our search results page.DataPipeline
is used to store these objects in a safe and efficient way.ThreadPoolExecutor
and we're going to remove our for
loop from start_scrape()
.ThreadPoolExecutor
allows us to open a pool with max_threads
. If we want to use 4 threads, we pass max_threads=4
.def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map()
go as follows:scrape_search_results
: the function we want to call on all these available threads.import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
get_scrapeops_url()
.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
payload
."api_key"
: our ScrapeOps API key."url"
: the url we want to scrape."country"
: the country we want to appear in.import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
MAX_RETRIES
MAX_THREADS
PAGES
LOCATION
LOCALITY
keyword_list
if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
process_posting()
. Like before, pay close attention to our parsing logic.def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url, location=location) job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = { "name": row["name"], "seniority": seniority, "position_type": position_type, "job_function": job_function, "industry": industry } print(job_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']")
finds all the items from our criteria list.job_criteria[0]
: seniority leveljob_criteria[1]
: position typejob_criteria[2]
: job functionjob_criteria[3]
: industryfor
loop to scrape details from every posting we found.Here is our first iteration of process_results()
. Later on, we'll rewrite it and add multithreading support.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_posting(row, location, retries=retries)
import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url, location=location) job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = { "name": row["name"], "seniority": seniority, "position_type": position_type, "job_function": job_function, "industry": industry } print(job_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_posting(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline
. We just need another dataclass
. Take a look below at JobData
. Just like our SearchData
from earlier, we use it to represent the data we scraped from the page.We'll pass this into our DataPipeline
which will then pipe our data into a CSV file.@dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. Then, instead of printing our parsed data, we create a JobData
object out of it and then pass our JobData
into the pipeline.import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url, location=location) job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = JobData( name=row["name"], seniority=seniority, position_type=position_type, job_function=job_function, industry=industry ) job_pipeline.add_data(job_data) job_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_posting(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
JobData
holds the data we pull from the page.DataPipeline
takes a JobData
object and pipes it to a CSV file.ThreadPoolExecutor
for concurrency just like we did earlier.Take a look at our refactored version of process_results()
.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_posting, reader, [location] * len(reader), [retries] * len(reader) )
executor.map()
:process_posting
: the function we want to call on multiple threads.process_posting
get passed in as arrays.driver.get(get_scrapeops_url(url, location=location))
import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(get_scrapeops_url(url, location=location)) job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = JobData( name=row["name"], seniority=seniority, position_type=position_type, job_function=job_function, industry=industry ) job_pipeline.add_data(job_data) job_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_posting, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
PAGES
to 3 and our MAX_THREADS
to 5.If you need a refresher on our main
, you can see it again below.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
. Their terms are available here and their robots.txt
is here.As stated at the top of their robots.txt
, crawling LinkedIn is explicitly prohibited. By scraping LinkedIn, you can have your account suspended, banned, or even deleted.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file to it.{"api_key": "your-super-secret-api-key"}
.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processJob(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(getScrapeOpsUrl(url, location), { timeout: 0 }); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const jobCriteria = await page.$$("li[class='description__job-criteria-item']"); if (jobCriteria.length < 4) { throw new Error("Job Criteria Not Found!"); } const seniority = (await page.evaluate(element => element.textContent, jobCriteria[0])).replace("Seniority level", ""); const positionType = (await page.evaluate(element => element.textContent, jobCriteria[1])).replace("Employment type", ""); const jobFunction = (await page.evaluate(element => element.textContent, jobCriteria[2])).replace("Job function", ""); const industry = (await page.evaluate(element => element.textContent, jobCriteria[3])).replace("Industries", ""); const jobData = { name: row.name, seniority: seniority.trim(), position_type: positionType.trim(), job_function: jobFunction.trim(), industry: industry.trim() } await writeToCsv([jobData], `${row.name.replace(" ", "-")}-${row.job_title.replace(" ", "-")}.csv`); success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; while (rows.length > 0) { const currentBatch = rows.splice(0, concurrencyLimit); const tasks = currentBatch.map(row => processJob(browser, row, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); } async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { console.time("processResults"); await processResults(file, location, concurrencyLimit, retries); console.timeEnd("processResults"); } console.log("Scrape complete");} main();
main
to fine-tune your results:keywords
: An array of job titles or terms to be used as search queries on LinkedInconcurrencyLimit
: The maximum number of pages or tasks processed concurrently.pages
: The number of pages of search results to crawl for each keyword.location
: A two-letter country code (e.g., "us") specifying the country for the search results.locality
: The human-readable location name (e.g., "United States") used in the search query.retries
: The number of retry attempts allowed for failed tasks (e.g., failed page loads or data extractions).node name-of-your-script
or node name-of-your-script.js
will run the scraper.Modern NodeJS doesn't require a file extension in the name.Once it's done running, you'll get a CSV named after your search. This one will contain all of your search data. You get an individual report generated for each job listing as well. These individual files contain more detailed information about each job posting.ThreadPoolExecutor
to add support for multithreading and therefore concurrency.ThreadPoolExecutor
to scrape posting data concurrently.https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=
https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=$software+engineer&location=united+states&original_referer=
https://www.linkedin.com/jobs-guest/jobs/api
/api
inside of it. Our requests are actually going to their API.Surprisingly, this API endpoint doesn't respond with JSON or XML, it gives us straight HTML. In years of web development and scraping, LinkedIn is the only place I've ever seen this.The screenshot below gives us a barebones HTML page without any styling whatsoever, but it is in fact a webpage. When you're viewing data from the main page, the page fetches this HTML and uses to to update your screen.div
elements. Each one we want has a class name of base-search-card__info
.For individual job pages, we look for li
elements with a class of description__job-criteria-item
.In the image below, you can see a div
. Its class name is base-search-card__info
. This is one of our search results. To extract this data, we need to find each div
matching this class.li
element we want to scrape. Each li
element has the classname, description__job-criteria-item
. For these, we'll extract all li
elements matching our target class.&start={pageNumber*10}
. For page 1 of the Software Engineer search, we get this URL:https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location=United+States&original_referer=&start=0
pageNumber*10
because we begin counting at 0 and each request yields 10 results. Page 0 (0 * 10) yields results 1 through 10. Page 1 yields 11 through 20 and so on and so forth.Look below to see how our fully formatted url looks:`https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`
country
."country": "us"
into the API."country": "uk"
.mkdir linkedin-jobs-scraper cd linkedin-jobs-scraper
npm init --y
npm install puppeteer
npm install csv-writer
npm install csv-parse
npm install fs
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function scrapeSearchResults(browser, keyword, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; console.log(searchData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, locality, location, retries) { const browser = await puppeteer.launch(); await scrapeSearchResults(browser, keyword, locality, location, retries); await browser.close();} async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); }} main();
main()
, we call startCrawl()
. At the moment, this function opens a browser and passes it into our parsing function, startScrape()
.
await puppeteer.launch();
launches the browser.scrapeSearchResults(browser, keyword, locality, location, retries)
.await browser.close();
scrapeSearchResults()
.
divCards
with await page.$$("div[class='base-search-card__info']");
.page.evauluate()
: await page.evaluate(element => element.textContent, nameElement)
. This method is used for the name
, jobTitle
, link
, and jobLocation
.searchData
object and remove the whitespace and any newline characters with the trim()
method.searchData
, we print it to the console.start={pageNumber*10}
to the end of our URL.startCrawl()
to scrape multiple pages.for
loop that allows us to do this. This is only temporary, later on, we'll replace it with some more powerful code that performs our search concurrently.`https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`
range()
function similar to the one from Python.function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;}
startCrawl()
. It uses a simple for
loop to iterate through our pages.async function startCrawl(keyword, pages, locality, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, locality, location, retries) } await browser.close();}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; console.log(searchData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, locality, location, retries) } await browser.close();} async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); }} main();
start={pageNumber*10}
allows us to control our pagination. We use pageNumber*10
because we get 10 results per page and our results start at zero.range()
and startCrawl()
, we can now scrape an array of pages.writetoCsv()
function.writeToCsv()
.success
variable and setting it to false
.append
to the fileExists
variable.data
isn't an array, we convert it to one.await csvWriter.writeRecords(data);
to write our data to the CSV file.success
to true
.async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }}
data
to a CSV file.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, locality, location, retries) } await browser.close();} async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }} main();
async
support to scrape concurrently. We'll rewrite startCrawl()
to handle this.Here is our final startCrawl()
function.for
loop, we create a list of tasks
by splicing from our pageList
up to our concurrencyLimit
.await
all these tasks
to resolve with Promise.all()
.concurrencyLimit
to 5, we'll scrape up to 5 pages at a time.async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }} main();
api_key
, url
and a country
.Let's explain these a little better.api_key
: This is literally a key to our ScrapeOps account. Your API key is used to authenticate your accout when making requests.url
: This is the url of the site we want to scrape. ScrapeOps will fetch this site and send the result back to us.country
: We pass a country code in for this parameter. ScrapeOps reads our country code and routes our request through a server in the country we chose.function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }} main();
concurrencyLimit
of 5.Feel free to change any of the following from the main()
function.keywords
concurrencyLimit
pages
location
locality
retries
main()
if you'd like to review it.async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 3; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }}
processJob()
. We check for bad responses and throw
an Error
if we don't receive the correct response. If we get a good response, we continue on and parse the page.async function processJob(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(url); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const jobCriteria = await page.$$("li[class='description__job-criteria-item']"); if (jobCriteria.length < 4) { throw new Error("Job Criteria Not Found!"); } const seniority = (await page.evaluate(element => element.textContent, jobCriteria[0])).replace("Seniority level", ""); const positionType = (await page.evaluate(element => element.textContent, jobCriteria[1])).replace("Employment type", ""); const jobFunction = (await page.evaluate(element => element.textContent, jobCriteria[2])).replace("Job function", ""); const industry = (await page.evaluate(element => element.textContent, jobCriteria[3])).replace("Industries", ""); const jobData = { name: row.name, seniority: seniority.trim(), position_type: positionType.trim(), job_function: jobFunction.trim(), industry: industry.trim() } console.log(jobData) success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } }
jobCriteria = await page.$$("li[class='description__job-criteria-item']");
finds the items from our criteria list.const seniority = (await page.evaluate(element => element.textContent, jobCriteria[0])).replace("Seniority level", "");
: seniority levelconst positionType = (await page.evaluate(element => element.textContent, jobCriteria[1])).replace("Employment type", "");
: position typeconst jobFunction = (await page.evaluate(element => element.textContent, jobCriteria[2])).replace("Job function", "");
: job functionconst industry = (await page.evaluate(element => element.textContent, jobCriteria[3])).replace("Industries", "");
: industrypage.evaluate()
to pull the text from each element we find.row
, we need to read the rows from our CSV file. We'll read our file into an array and then we'll use a for
loop to scrape details from every posting we found.Here is our first iteration of processResults()
.Later on, we'll rewrite it and add concurrency support. It;s pretty similar to our startCrawl()
function from earlier in this tutorial.async function processResults(csvFile, location, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; for (const row of rows) { await processJob(browser, row, location, retries) } await browser.close(); }
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processJob(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(url); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const jobCriteria = await page.$$("li[class='description__job-criteria-item']"); if (jobCriteria.length < 4) { throw new Error("Job Criteria Not Found!"); } const seniority = (await page.evaluate(element => element.textContent, jobCriteria[0])).replace("Seniority level", ""); const positionType = (await page.evaluate(element => element.textContent, jobCriteria[1])).replace("Employment type", ""); const jobFunction = (await page.evaluate(element => element.textContent, jobCriteria[2])).replace("Job function", ""); const industry = (await page.evaluate(element => element.textContent, jobCriteria[3])).replace("Industries", ""); const jobData = { name: row.name, seniority: seniority.trim(), position_type: positionType.trim(), job_function: jobFunction.trim(), industry: industry.trim() } console.log(jobData) success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } } async function processResults(csvFile, location, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; for (const row of rows) { await processJob(browser, row, location, retries) } await browser.close(); } async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { console.time("processResults"); await processResults(file, location, retries); console.timeEnd("processResults"); } console.log("Scrape complete");} main();
jobData
object. We also already have a writeToCsv()
function. Instead of logging our jobData
to the console, we just need to store it.In the code below, we're going to do exactly that.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processJob(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(url); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const jobCriteria = await page.$$("li[class='description__job-criteria-item']"); if (jobCriteria.length < 4) { throw new Error("Job Criteria Not Found!"); } const seniority = (await page.evaluate(element => element.textContent, jobCriteria[0])).replace("Seniority level", ""); const positionType = (await page.evaluate(element => element.textContent, jobCriteria[1])).replace("Employment type", ""); const jobFunction = (await page.evaluate(element => element.textContent, jobCriteria[2])).replace("Job function", ""); const industry = (await page.evaluate(element => element.textContent, jobCriteria[3])).replace("Industries", ""); const jobData = { name: row.name, seniority: seniority.trim(), position_type: positionType.trim(), job_function: jobFunction.trim(), industry: industry.trim() } await writeToCsv([jobData], `${row.name.replace(" ", "-")}-${row.job_title.replace(" ", "-")}.csv`); success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } } async function processResults(csvFile, location, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; for (const row of rows) { await processJob(browser, row, location, retries) } await browser.close(); } async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { console.time("processResults"); await processResults(file, location, retries); console.timeEnd("processResults"); } console.log("Scrape complete");} main();
jobData
holds the data we pull from the page.jobData
into writeToCsv()
and it then gets saved to a CSV file.tasks
by splicing our rows
by our concurrencyLimit
.await
everything to resolve using Promise.all()
.concurrencyLimit
to 5, we'll be processing the rows
in batches of 5.async function processResults(csvFile, location, concurrencyLimit, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; while (rows.length > 0) { const currentBatch = rows.splice(0, concurrencyLimit); const tasks = currentBatch.map(row => processJob(browser, row, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();}
await readCsv(csvFile);
: This returns all the rows from the CSV file in an array.rows.splice(0, concurrencyLimit);
shrinks the rows
array and gives us a chunk to work with.currentBatch.map(row => processJob(browser, row, location, retries))
runs processJob()
on each element in the chunk.await Promise.all(tasks);
waits for each one of our tasks
to resolve.rows
array is completely gone.const response = await page.goto(getScrapeOpsUrl(url, location), { timeout: 0 });
{ timeout: 0 }
to tell Puppeteer not to time out. When dealing with a proxy along with a site as difficult as LinkedIn, pages sometimes take awhile to come back to us.location
is getting passed into our proxy function, we're actually going to be routed through a server in the country of our choice.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processJob(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(getScrapeOpsUrl(url, location), { timeout: 0 }); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const jobCriteria = await page.$$("li[class='description__job-criteria-item']"); if (jobCriteria.length < 4) { throw new Error("Job Criteria Not Found!"); } const seniority = (await page.evaluate(element => element.textContent, jobCriteria[0])).replace("Seniority level", ""); const positionType = (await page.evaluate(element => element.textContent, jobCriteria[1])).replace("Employment type", ""); const jobFunction = (await page.evaluate(element => element.textContent, jobCriteria[2])).replace("Job function", ""); const industry = (await page.evaluate(element => element.textContent, jobCriteria[3])).replace("Industries", ""); const jobData = { name: row.name, seniority: seniority.trim(), position_type: positionType.trim(), job_function: jobFunction.trim(), industry: industry.trim() } await writeToCsv([jobData], `${row.name.replace(" ", "-")}-${row.job_title.replace(" ", "-")}.csv`); success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; while (rows.length > 0) { const currentBatch = rows.splice(0, concurrencyLimit); const tasks = currentBatch.map(row => processJob(browser, row, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); } async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { console.time("processResults"); await processResults(file, location, concurrencyLimit, retries); console.timeEnd("processResults"); } console.log("Scrape complete");} main();
main()
below. As we mentioned earlier, you can change the following to tweak your results.keywords
concurrencyLimit
pages
location
locality
retries
async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 3; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { console.time("processResults"); await processResults(file, location, concurrencyLimit, retries); console.timeEnd("processResults"); } console.log("Scrape complete");}
robots.txt
because we haven't agreed to anything, but they take these policies very seriously.Their terms are available here and their robots.txt
is here. As stated at the top of their robots.txt
, crawling LinkedIn is explicitly prohibited.By scraping LinkedIn, you can have your account suspended, banned, or even deleted.