Then check out ScrapeOps, the complete toolkit for web scraping.
config.json file.{"api_key": "your-super-secret-api-key"}.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code != 200: raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") job_criteria = soup.find_all("li", class_="description__job-criteria-item") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = JobData( name=row["name"], seniority=seniority, position_type=position_type, job_function=job_function, industry=industry ) job_pipeline.add_data(job_data) job_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_posting, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES: Defines the maximum number of times the script will attempt to retrieve a webpage if the initial request fails (e.g., due to network issues or rate limiting).MAX_THREADS: Sets the maximum number of threads that the script will use concurrently during scraping.PAGES: The number of pages of job listings to scrape for each keyword.LOCATION: The country code or identifier for the region from which job listings should be scraped (e.g., "us" for the United States).LOCALITY: The textual representation of the location where the jobs are being scraped (e.g., "United States").keyword_list: A list of keywords representing job titles or roles to search for on LinkedIn (e.g., ["software engineer"]).python name_of_your_script.py.  You'll get a CSV named after the keyword you searched.  Then, you'll get an individual CSV report on each job as well.ThreadPoolExecutor to add support for multithreading and therefore concurrency.ThreadPoolExecutor to scrape posting data concurrently.https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer="
https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location={formatted_locality}&original_referer=
div card with a class name, base-search-card__info.li element with a class name, description__job-criteria-item.base-search-card__info.In this next image, you'll see one of the li items that we would extract.&start={page_number*10}.Our full URL for page 1 of the Software Engineer search would look like this:https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location=United+States&original_referer=&start=0
page_number*10 because we begin counting at 0 and each request yields 10 results.  Page 0 (0 * 10) gives us results 1 through 10.  Page 1 gives us 11 through 20 and so on and so forth.Inside our Python code, the URL would look like this:f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"country."country": "us" into the API."country": "uk".mkdir linkedin-jobs-scraper cd linkedin-jobs-scraper
python -m venv venvsource venv/bin/activatepip install requestspip install beautifulsoup4scrape_search_results().import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = { "name": company_name, "job_title": job_title, "url": job_link, "location": location } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: scrape_search_results(keyword, LOCATION, LOCALITY) logger.info(f"Crawl complete.")
soup.find_all("div", class_="base-search-card__info") to find all of our base result cards.div_card.find("h4", class_="base-search-card__subtitle").text finds our company_name.h3, so we use div_card.find("h3", class_="base-search-card__title").text to find it.div_card.parent.find("a").href from the link element with link.get("href").div_card.find("span", class_="job-search-card__location").text gets the job location from the card.start={page_number*10} to the end of our URL.  We also need a function that allows us to scrape multiple pages, we'll call it start_scrape().Our fully paginated urls are laid out in the snippet you see below.url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"start_scrape() is in our next snippet.  At the moment, it's just a simple for loop that parses pages using iteration.  Later on, we'll make some improvements to it.def start_scrape(keyword, pages, location, locality, retries=3): for page in range(pages): scrape_search_results(keyword, location, locality, page_number, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = { "name": company_name, "job_title": job_title, "url": job_link, "location": location } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, retries=3): for page in range(pages): scrape_search_results(keyword, location, locality, page_number, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: start_scrape(keyword, PAGES, LOCATION, LOCALITY, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
start={page_number*10} gives us the ability to control pagination inside our url.start_scrape() allows us to parse a list of pages.dataclass called SearchData.DataPipeline.  SearchData simply needs to represent individual search items.DataPipeline needs to open a pipe to a CSV file and store SearchData objects inside our CSV.Here is our SearchData.  It holds the name, job_title, url and location that we find during the parse.@dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
SearchData, it gets passed into the DataPipeline you see below.  The DataPipeline first checks to see if our CSV file exists.  If it exists, we append the file.If the file doesn't exist, we create one.  This approach stops us from accidentally destroying important data.  This class also filters out duplicates using the name attribute.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, locality, page_number, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData to represent individual results from our search results page.DataPipeline is used to store these objects in a safe and effficient way.ThreadPoolExecutor and we're going to remove our for loop from start_scrape().ThreadPoolExecutor allows us to open a pool with max_threads.  If we want to use 4 threads, we pass max_threads=4.def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map() go as follows:scrape_search_results: the function we want to call on all these available threads.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
url and a location.  Along with these, the function will handle some set parameters and spit out a ScrapeOps proxied URL.Take a look at get_scrapeops_url().def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
payload."api_key": our ScrapeOps API key."url": the url we want to scrape."country": the country we want to appear in.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
MAX_RETRIES: Defines the maximum number of times the script will attempt to retrieve a webpage if the initial request fails (e.g., due to network issues or rate limiting).MAX_THREADS: Sets the maximum number of threads that the script will use concurrently during scraping.PAGES: The number of pages of job listings to scrape for each keyword.LOCATION: The country code or identifier for the region from which job listings should be scraped (e.g., "us" for the United States).LOCALITY: The textual representation of the location where the jobs are being scraped (e.g., "United States").keyword_list: A list of keywords representing job titles or roles to search for on LinkedIn (e.g., ["software engineer"]).if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
process_posting().  Like before, pay close attention to our parsing logic.def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code != 200: raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") job_criteria = soup.find_all("li", class_="description__job-criteria-item") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = { "name": row["name"], "seniority": seniority, "position_type": position_type, "job_function": job_function, "industry": industry } print(job_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
soup.find_all("li", class_="description__job-criteria-item") finds all of our criteria pieces.job_criteria[0]: senority leveljob_criteria[1]: position typejob_criteria[2]: job functionjob_criteria[3]: industryfor loop to call process_posting() on each row from the file.Here is our first iteration of process_results().  Later on, we'll rewrite it and add multithreading support.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_posting(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code != 200: raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") job_criteria = soup.find_all("li", class_="description__job-criteria-item") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = { "name": row["name"], "seniority": seniority, "position_type": position_type, "job_function": job_function, "industry": industry } print(job_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_posting(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline.  Storing our data will be very easy at this point.  We just need another dataclass.  Take a look below at JobData.Just like our SearchData from earlier, we use it to represent the data we scraped from the page.@dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline.  Then, instead of printing our parsed data, we create a JobData object out of it and then pass our JobData into the pipeline.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code != 200: raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") job_criteria = soup.find_all("li", class_="description__job-criteria-item") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = JobData( name=row["name"], seniority=seniority, position_type=position_type, job_function=job_function, industry=industry ) job_pipeline.add_data(job_data) job_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_posting(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
JobData holds the data we pull from the page.DataPipeline takes a JobData object and pipes it to a CSV file.ThreadPoolExecutor like we did earlier.Take a look at our refactored version of process_results().def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_posting, reader, [location] * len(reader), [retries] * len(reader) )
executor.map():process_posting: the function we want to call on multiple threads.process_posting get passed in as arrays.response = requests.get(get_scrapeops_url(url, location=location))import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div", class_="base-search-card__info") for div_card in div_cards: company_name = div_card.find("h4", class_="base-search-card__subtitle").text job_title = div_card.find("h3", class_="base-search-card__title").text link = div_card.parent.find("a") job_link = link.get("href") location = div_card.find("span", class_="job-search-card__location").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code != 200: raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") job_criteria = soup.find_all("li", class_="description__job-criteria-item") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = JobData( name=row["name"], seniority=seniority, position_type=position_type, job_function=job_function, industry=industry ) job_pipeline.add_data(job_data) job_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_posting, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main, you can see it again below.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt.  Their terms are available here and their robots.txt is here.As stated at the top of their robots.txt, crawling LinkedIn is explicitly prohibited.  By scraping LinkedIn, you can have your account suspended, banned, or even deleted.Always ensure compliance with LinkedIn's policies and consider using official APIs or getting explicit permission for large-scale data extraction.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json file.{"api_key": "your-super-secret-api-key"}.python name_of_your_script.py is the command you'll use to run the scraper.import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(get_scrapeops_url(url, location=location)) job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = JobData( name=row["name"], seniority=seniority, position_type=position_type, job_function=job_function, industry=industry ) job_pipeline.add_data(job_data) job_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_posting, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main to fine-tune your results:MAX_RETRIES: Defines the maximum number of times the script will attempt to retrieve a webpage if the initial request fails (e.g., due to network issues or rate limiting).MAX_THREADS: Sets the maximum number of threads that the script will use concurrently during scraping.PAGES: The number of pages of job listings to scrape for each keyword.LOCATION: The country code or identifier for the region from which job listings should be scraped (e.g., "us" for the United States).LOCALITY: The textual representation of the location where the jobs are being scraped (e.g., "United States").keyword_list: A list of keywords representing job titles or roles to search for on LinkedIn (e.g., ["software engineer"]).ThreadPoolExecutor to add support for multithreading and therefore concurrency.ThreadPoolExecutor to scrape posting data concurrently.https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer="
https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location={formatted_locality}&original_referer=
https://www.linkedin.com/jobs-guest/jobs/api), you might notice something interesting.  We're actually making API requests, hence the endpoint, /api.Something even more interesting, this API endpoint doesn't give us JSON or XML, it sends back straight HTML.  In years of web development and scraping, LinkedIn is the only place I've ever seen something like this.The screenshot below gives us a barebones HTML page without any styling whatsoever, but it is in fact a webpage.Once we're finished with our search, we'll scrape individual listings.  Take a look at the shot below.  This is the basic layout for any job posted on LinkedIn.  We don't need to worry about the urls for these.  We'll be extracting these urls during our scrape.div elements.base-search-card__info.li elements with a class of description__job-criteria-item.div.  Its class name is base-search-card__info.  To extract this data, we need to find each div that matches this class.Here is the type of li element we want to scrape.  Each li element is given the classname, description__job-criteria-item.  So for these, we want to pull all li elements with this class.&start={page_number*10}.Our full URL for page 1 of the Software Engineer search would look like this:page_number*10 because we begin counting at 0 and each request yields 10 results.f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"country."country": "us" into the API."country": "uk".mkdir linkedin-jobs-scraper cd linkedin-jobs-scraper
python -m venv venvsource venv/bin/activatepip install seleniumscrape_search_results().import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = { "name": company_name, "job_title": job_title, "url": job_link, "location": location } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, LOCALITY, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
options = webdriver.ChromeOptions().  Then we use options.add_argument("--headless") to set our browser to headless mode.driver = webdriver.Chrome(options=options) launches Selenium with our custom options.driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") to find all of our base result cards.company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text finds our company_name.h3, so we use div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text to find it.parent of the div_card: div_card.find_element(By.XPATH, "..").  We use the XPATH and pass in .. to find the parent.parent.find_element(By.CSS_SELECTOR, "a").href from the link element with link.get_attribute("href").div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text gets the job location from the card.start={page_number*10} to the end of our URL.  We need an additional function to scrape multiple pages.  We'll call it start_scrape().Our fully paginated urls are laid out in the snippet you see below.url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}"start_scrape() is in our next snippet.  At the moment, it's just a simple for loop that parses pages using iteration.  Later on, we'll make some improvements to it.def start_scrape(keyword, pages, location, locality, retries=3): for page in pages: scrape_search_results(keyword, location, locality, page, retries=retries)
import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, locality, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = { "name": company_name, "job_title": job_title, "url": job_link, "location": location } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, retries=3): for page in pages: scrape_search_results(keyword, location, locality, page, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, LOCALITY, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
start={page_number*10} controls our pagination.start_scrape(), we can parse a list of pages.dataclass called SearchData.  The second one is our DataPipeline.SearchData simply needs to represent individual search items.DataPipeline needs to open a pipe to a CSV file and store SearchData objects inside our CSV.SearchData.  It holds the name, job_title, url and location that we find during the parse.@dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
SearchData, we pass it into the DataPipeline you see below.DataPipeline first checks to see if our CSV file exists.
name attribute.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, retries=3): for page in pages: scrape_search_results(keyword, location, locality, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData to represent individual results from our search results page.DataPipeline is used to store these objects in a safe and efficient way.ThreadPoolExecutor and we're going to remove our for loop from start_scrape().ThreadPoolExecutor allows us to open a pool with max_threads.  If we want to use 4 threads, we pass max_threads=4.def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
executor.map() go as follows:scrape_search_results: the function we want to call on all these available threads.import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
get_scrapeops_url().def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
payload."api_key": our ScrapeOps API key."url": the url we want to scrape."country": the country we want to appear in.import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
MAX_RETRIESMAX_THREADSPAGESLOCATIONLOCALITYkeyword_listif __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
process_posting().  Like before, pay close attention to our parsing logic.def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url, location=location) job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = { "name": row["name"], "seniority": seniority, "position_type": position_type, "job_function": job_function, "industry": industry } print(job_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']") finds all the items from our criteria list.job_criteria[0]: seniority leveljob_criteria[1]: position typejob_criteria[2]: job functionjob_criteria[3]: industryfor loop to scrape details from every posting we found.Here is our first iteration of process_results().  Later on, we'll rewrite it and add multithreading support.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_posting(row, location, retries=retries)
import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url, location=location) job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = { "name": row["name"], "seniority": seniority, "position_type": position_type, "job_function": job_function, "industry": industry } print(job_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_posting(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
DataPipeline.  We just need another dataclass.  Take a look below at JobData.  Just like our SearchData from earlier, we use it to represent the data we scraped from the page.We'll pass this into our DataPipeline which will then pipe our data into a CSV file.@dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline.  Then, instead of printing our parsed data, we create a JobData object out of it and then pass our JobData into the pipeline.import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(url, location=location) job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = JobData( name=row["name"], seniority=seniority, position_type=position_type, job_function=job_function, industry=industry ) job_pipeline.add_data(job_data) job_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_posting(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, retries=MAX_RETRIES)
JobData holds the data we pull from the page.DataPipeline takes a JobData object and pipes it to a CSV file.ThreadPoolExecutor for concurrency just like we did earlier.Take a look at our refactored version of process_results().def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_posting, reader, [location] * len(reader), [retries] * len(reader) )
executor.map():process_posting: the function we want to call on multiple threads.process_posting get passed in as arrays.driver.get(get_scrapeops_url(url, location=location))import osimport csvimport jsonfrom selenium import webdriverfrom selenium.webdriver.common.by import Byimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] options = webdriver.ChromeOptions()options.add_argument("--headless") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" job_title: str = "" url: str = "" location: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass JobData: name: str = "" seniority: str = "" position_type: str = "" job_function: str = "" industry: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, locality, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") formatted_locality = locality.replace(" ", "+") url = f"https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords={formatted_keyword}&location={formatted_locality}&original_referer=&start={page_number*10}" tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[class='base-search-card__info']") if not div_cards: driver.save_screenshot("debug.png") raise Exception("Page did not load correctly, please check debug.png") for div_card in div_cards: company_name = div_card.find_element(By.CSS_SELECTOR, "h4[class='base-search-card__subtitle']").text print("company name", company_name) job_title = div_card.find_element(By.CSS_SELECTOR, "h3[class='base-search-card__title']").text parent = div_card.find_element(By.XPATH, "..") link = parent.find_element(By.CSS_SELECTOR, "a") job_link = link.get_attribute("href") location = div_card.find_element(By.CSS_SELECTOR, "span[class='job-search-card__location']").text search_data = SearchData( name=company_name, job_title=job_title, url=job_link, location=location ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, locality, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, [locality] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_posting(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(get_scrapeops_url(url, location=location)) job_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") job_criteria = driver.find_elements(By.CSS_SELECTOR, "li[class='description__job-criteria-item']") seniority = job_criteria[0].text.replace("Seniority level", "") position_type = job_criteria[1].text.replace("Employment type", "") job_function = job_criteria[2].text.replace("Job function", "") industry = job_criteria[3].text.replace("Industries", "") job_data = JobData( name=row["name"], seniority=seniority, position_type=position_type, job_function=job_function, industry=industry ) job_pipeline.add_data(job_data) job_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_posting, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
PAGES to 3 and our MAX_THREADS to 5.If you need a refresher on our main, you can see it again below.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 3 LOCATION = "us" LOCALITY = "United States" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["software engineer"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, LOCALITY, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt.  Their terms are available here and their robots.txt is here.As stated at the top of their robots.txt, crawling LinkedIn is explicitly prohibited.  By scraping LinkedIn, you can have your account suspended, banned, or even deleted.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json file to it.{"api_key": "your-super-secret-api-key"}.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processJob(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(getScrapeOpsUrl(url, location), { timeout: 0 }); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const jobCriteria = await page.$$("li[class='description__job-criteria-item']"); if (jobCriteria.length < 4) { throw new Error("Job Criteria Not Found!"); } const seniority = (await page.evaluate(element => element.textContent, jobCriteria[0])).replace("Seniority level", ""); const positionType = (await page.evaluate(element => element.textContent, jobCriteria[1])).replace("Employment type", ""); const jobFunction = (await page.evaluate(element => element.textContent, jobCriteria[2])).replace("Job function", ""); const industry = (await page.evaluate(element => element.textContent, jobCriteria[3])).replace("Industries", ""); const jobData = { name: row.name, seniority: seniority.trim(), position_type: positionType.trim(), job_function: jobFunction.trim(), industry: industry.trim() } await writeToCsv([jobData], `${row.name.replace(" ", "-")}-${row.job_title.replace(" ", "-")}.csv`); success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; while (rows.length > 0) { const currentBatch = rows.splice(0, concurrencyLimit); const tasks = currentBatch.map(row => processJob(browser, row, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); } async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { console.time("processResults"); await processResults(file, location, concurrencyLimit, retries); console.timeEnd("processResults"); } console.log("Scrape complete");} main();
main to fine-tune your results:keywords: An array of job titles or terms to be used as search queries on LinkedInconcurrencyLimit: The maximum number of pages or tasks processed concurrently.pages: The number of pages of search results to crawl for each keyword.location: A two-letter country code (e.g., "us") specifying the country for the search results.locality: The human-readable location name (e.g., "United States") used in the search query.retries: The number of retry attempts allowed for failed tasks (e.g., failed page loads or data extractions).node name-of-your-script or node name-of-your-script.js will run the scraper.Modern NodeJS doesn't require a file extension in the name.Once it's done running, you'll get a CSV named after your search.  This one will contain all of your search data.  You get an individual report generated for each job listing as well.  These individual files contain more detailed information about each job posting.ThreadPoolExecutor to add support for multithreading and therefore concurrency.ThreadPoolExecutor to scrape posting data concurrently.https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=
https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=$software+engineer&location=united+states&original_referer=
https://www.linkedin.com/jobs-guest/jobs/api
/api inside of it.  Our requests are actually going to their API.Surprisingly, this API endpoint doesn't respond with JSON or XML, it gives us straight HTML.  In years of web development and scraping, LinkedIn is the only place I've ever seen this.The screenshot below gives us a barebones HTML page without any styling whatsoever, but it is in fact a webpage.  When you're viewing data from the main page, the page fetches this HTML and uses to to update your screen.Once we're finished searching, we'll scrape individual listing data.  Look at the screenshot below.  This is the basic layout for any job posted on LinkedIn.  We don't need to worry about the URLs for these.  We'll find these URLs when we crawl the search results.div elements.  Each one we want has a class name of base-search-card__info.For individual job pages, we look for li elements with a class of description__job-criteria-item.In the image below, you can see a div.  Its class name is base-search-card__info.  This is one of our search results.  To extract this data, we need to find each div matching this class.The next shot holds the li element we want to scrape.  Each li element has the classname, description__job-criteria-item.  For these, we'll extract all li elements matching our target class.&start={pageNumber*10}.  For page 1 of the Software Engineer search, we get this URL:https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=software+engineer&location=United+States&original_referer=&start=0
pageNumber*10 because we begin counting at 0 and each request yields 10 results.  Page 0 (0 * 10) yields results 1 through 10.  Page 1 yields 11 through 20 and so on and so forth.Look below to see how our fully formatted url looks:`https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`country."country": "us" into the API."country": "uk".mkdir linkedin-jobs-scraper cd linkedin-jobs-scraper
npm init --ynpm install puppeteernpm install csv-writernpm install csv-parsenpm install fsconst puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function scrapeSearchResults(browser, keyword, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; console.log(searchData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, locality, location, retries) { const browser = await puppeteer.launch(); await scrapeSearchResults(browser, keyword, locality, location, retries); await browser.close();} async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); }} main();
main(), we call startCrawl().  At the moment, this function opens a browser and passes it into our parsing function, startScrape().
await puppeteer.launch(); launches the browser.scrapeSearchResults(browser, keyword, locality, location, retries).await browser.close();scrapeSearchResults().
divCards with await page.$$("div[class='base-search-card__info']");.page.evauluate(): await page.evaluate(element => element.textContent, nameElement).  This method is used for the name, jobTitle, link, and jobLocation.searchData object and remove the whitespace and any newline characters with the trim() method.searchData, we print it to the console.start={pageNumber*10} to the end of our URL.startCrawl() to scrape multiple pages.for loop that allows us to do this.  This is only temporary, later on, we'll replace it with some more powerful code that performs our search concurrently.`https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`range() function similar to the one from Python.function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;}
startCrawl().  It uses a simple for loop to iterate through our pages.async function startCrawl(keyword, pages, locality, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, locality, location, retries) } await browser.close();}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; console.log(searchData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, locality, location, retries) } await browser.close();} async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); }} main();
start={pageNumber*10} allows us to control our pagination.  We use pageNumber*10 because we get 10 results per page and our results start at zero.range() and startCrawl(), we can now scrape an array of pages.writetoCsv() function.writeToCsv().success variable and setting it to false.append to the fileExists variable.data isn't an array, we convert it to one.await csvWriter.writeRecords(data); to write our data to the CSV file.success to true.async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }}
data to a CSV file.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); for (const page of pageList) { await scrapeSearchResults(browser, keyword, page, locality, location, retries) } await browser.close();} async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }} main();
async support to scrape concurrently.  We'll rewrite startCrawl() to handle this.Here is our final startCrawl() function.for loop, we create a list of tasks by splicing from our pageList up to our concurrencyLimit.await all these tasks to resolve with Promise.all().concurrencyLimit to 5, we'll scrape up to 5 pages at a time.async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }} main();
api_key, url and a country.Let's explain these a little better.api_key:  This is literally a key to our ScrapeOps account.  Your API key is used to authenticate your accout when making requests.url:  This is the url of the site we want to scrape.  ScrapeOps will fetch this site and send the result back to us.country: We pass a country code in for this parameter.  ScrapeOps reads our country code and routes our request through a server in the country we chose.function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }} main();
concurrencyLimit of 5.Feel free to change any of the following from the main() function.keywordsconcurrencyLimitpageslocationlocalityretriesmain() if you'd like to review it.async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 3; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); }}
processJob().  We check for bad responses and throw an Error if we don't receive the correct response.  If we get a good response, we continue on and parse the page.async function processJob(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(url); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const jobCriteria = await page.$$("li[class='description__job-criteria-item']"); if (jobCriteria.length < 4) { throw new Error("Job Criteria Not Found!"); } const seniority = (await page.evaluate(element => element.textContent, jobCriteria[0])).replace("Seniority level", ""); const positionType = (await page.evaluate(element => element.textContent, jobCriteria[1])).replace("Employment type", ""); const jobFunction = (await page.evaluate(element => element.textContent, jobCriteria[2])).replace("Job function", ""); const industry = (await page.evaluate(element => element.textContent, jobCriteria[3])).replace("Industries", ""); const jobData = { name: row.name, seniority: seniority.trim(), position_type: positionType.trim(), job_function: jobFunction.trim(), industry: industry.trim() } console.log(jobData) success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } }
jobCriteria = await page.$$("li[class='description__job-criteria-item']"); finds the items from our criteria list.const seniority = (await page.evaluate(element => element.textContent, jobCriteria[0])).replace("Seniority level", "");: seniority levelconst positionType = (await page.evaluate(element => element.textContent, jobCriteria[1])).replace("Employment type", "");: position typeconst jobFunction = (await page.evaluate(element => element.textContent, jobCriteria[2])).replace("Job function", "");: job functionconst industry = (await page.evaluate(element => element.textContent, jobCriteria[3])).replace("Industries", "");: industrypage.evaluate() to pull the text from each element we find.row, we need to read the rows from our CSV file.  We'll read our file into an array and then we'll use a for loop to scrape details from every posting we found.Here is our first iteration of processResults().Later on, we'll rewrite it and add concurrency support.  It;s pretty similar to our startCrawl() function from earlier in this tutorial.async function processResults(csvFile, location, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; for (const row of rows) { await processJob(browser, row, location, retries) } await browser.close(); }
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processJob(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(url); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const jobCriteria = await page.$$("li[class='description__job-criteria-item']"); if (jobCriteria.length < 4) { throw new Error("Job Criteria Not Found!"); } const seniority = (await page.evaluate(element => element.textContent, jobCriteria[0])).replace("Seniority level", ""); const positionType = (await page.evaluate(element => element.textContent, jobCriteria[1])).replace("Employment type", ""); const jobFunction = (await page.evaluate(element => element.textContent, jobCriteria[2])).replace("Job function", ""); const industry = (await page.evaluate(element => element.textContent, jobCriteria[3])).replace("Industries", ""); const jobData = { name: row.name, seniority: seniority.trim(), position_type: positionType.trim(), job_function: jobFunction.trim(), industry: industry.trim() } console.log(jobData) success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } } async function processResults(csvFile, location, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; for (const row of rows) { await processJob(browser, row, location, retries) } await browser.close(); } async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { console.time("processResults"); await processResults(file, location, retries); console.timeEnd("processResults"); } console.log("Scrape complete");} main();
jobData object.  We also already have a writeToCsv() function.  Instead of logging our jobData to the console, we just need to store it.In the code below, we're going to do exactly that.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processJob(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(url); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const jobCriteria = await page.$$("li[class='description__job-criteria-item']"); if (jobCriteria.length < 4) { throw new Error("Job Criteria Not Found!"); } const seniority = (await page.evaluate(element => element.textContent, jobCriteria[0])).replace("Seniority level", ""); const positionType = (await page.evaluate(element => element.textContent, jobCriteria[1])).replace("Employment type", ""); const jobFunction = (await page.evaluate(element => element.textContent, jobCriteria[2])).replace("Job function", ""); const industry = (await page.evaluate(element => element.textContent, jobCriteria[3])).replace("Industries", ""); const jobData = { name: row.name, seniority: seniority.trim(), position_type: positionType.trim(), job_function: jobFunction.trim(), industry: industry.trim() } await writeToCsv([jobData], `${row.name.replace(" ", "-")}-${row.job_title.replace(" ", "-")}.csv`); success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } } async function processResults(csvFile, location, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; for (const row of rows) { await processJob(browser, row, location, retries) } await browser.close(); } async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { console.time("processResults"); await processResults(file, location, retries); console.timeEnd("processResults"); } console.log("Scrape complete");} main();
jobData holds the data we pull from the page.jobData into writeToCsv() and it then gets saved to a CSV file.tasks by splicing our rows by our concurrencyLimit.await everything to resolve using Promise.all().concurrencyLimit to 5, we'll be processing the rows in batches of 5.async function processResults(csvFile, location, concurrencyLimit, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; while (rows.length > 0) { const currentBatch = rows.splice(0, concurrencyLimit); const tasks = currentBatch.map(row => processJob(browser, row, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();}
await readCsv(csvFile);:  This returns all the rows from the CSV file in an array.rows.splice(0, concurrencyLimit); shrinks the rows array and gives us a chunk to work with.currentBatch.map(row => processJob(browser, row, location, retries)) runs processJob() on each element in the chunk.await Promise.all(tasks); waits for each one of our tasks to resolve.rows array is completely gone.const response = await page.goto(getScrapeOpsUrl(url, location), { timeout: 0 });{ timeout: 0 } to tell Puppeteer not to time out.  When dealing with a proxy along with a site as difficult as LinkedIn, pages sometimes take awhile to come back to us.location is getting passed into our proxy function, we're actually going to be routed through a server in the country of our choice.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function range(start, end) { const array = []; for (let i=start; i<end; i++) { array.push(i); } return array;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults(browser, keyword, pageNumber, locality, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(" ", "+"); const formattedLocality = locality.replace(" ", "+"); const page = await browser.newPage(); try { const url = `https://www.linkedin.com/jobs-guest/jobs/api/seeMoreJobPostings/search?keywords=${formattedKeyword}&location=${formattedLocality}&original_referer=&start=${pageNumber*10}`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const nameElement = await divCard.$("h4[class='base-search-card__subtitle']"); const name = await page.evaluate(element => element.textContent, nameElement); const jobTitleElement = await divCard.$("h3[class='base-search-card__title']"); const jobTitle = await page.evaluate(element => element.textContent, jobTitleElement); const parentElement = await page.evaluateHandle(element => element.parentElement, divCard); const aTag = await parentElement.$("a"); const link = await page.evaluate(element => element.getAttribute("href"), aTag); const jobLocationElement = await divCard.$("span[class='job-search-card__location']"); const jobLocation = await page.evaluate(element => element.textContent, jobLocationElement); const searchData = { name: name.trim(), job_title: jobTitle.trim(), url: link.trim(), location: jobLocation.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keyword, pages, locality, location, concurrencyLimit, retries) { const pageList = range(0, pages); const browser = await puppeteer.launch(); while (pageList.length > 0) { const currentBatch = pageList.splice(0, concurrencyLimit); const tasks = currentBatch.map(page => scrapeSearchResults(browser, keyword, page, locality, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processJob(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(getScrapeOpsUrl(url, location), { timeout: 0 }); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const jobCriteria = await page.$$("li[class='description__job-criteria-item']"); if (jobCriteria.length < 4) { throw new Error("Job Criteria Not Found!"); } const seniority = (await page.evaluate(element => element.textContent, jobCriteria[0])).replace("Seniority level", ""); const positionType = (await page.evaluate(element => element.textContent, jobCriteria[1])).replace("Employment type", ""); const jobFunction = (await page.evaluate(element => element.textContent, jobCriteria[2])).replace("Job function", ""); const industry = (await page.evaluate(element => element.textContent, jobCriteria[3])).replace("Industries", ""); const jobData = { name: row.name, seniority: seniority.trim(), position_type: positionType.trim(), job_function: jobFunction.trim(), industry: industry.trim() } await writeToCsv([jobData], `${row.name.replace(" ", "-")}-${row.job_title.replace(" ", "-")}.csv`); success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; while (rows.length > 0) { const currentBatch = rows.splice(0, concurrencyLimit); const tasks = currentBatch.map(row => processJob(browser, row, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); } async function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 1; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { console.time("processResults"); await processResults(file, location, concurrencyLimit, retries); console.timeEnd("processResults"); } console.log("Scrape complete");} main();
main() below.  As we mentioned earlier, you can change the following to tweak your results.keywordsconcurrencyLimitpageslocationlocalityretriesasync function main() { const keywords = ["software engineer"]; const concurrencyLimit = 5; const pages = 3; const location = "us"; const locality = "United States"; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log("Crawl starting"); console.time("startCrawl"); await startCrawl(keyword, pages, locality, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } console.log("Starting scrape"); for (const file of aggregateFiles) { console.time("processResults"); await processResults(file, location, concurrencyLimit, retries); console.timeEnd("processResults"); } console.log("Scrape complete");}
robots.txt because we haven't agreed to anything, but they take these policies very seriously.Their terms are available here and their robots.txt is here.  As stated at the top of their robots.txt, crawling LinkedIn is explicitly prohibited.By scraping LinkedIn, you can have your account suspended, banned, or even deleted.