Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file. Inside the config file, add your ScrapeOps API key, {"api_key": "your-super-secret-api-key"}
.Then, copy and paste the code below into a Python file.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" display_name: str = "" url: str = "" location: str = "" companies: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ProfileData: name: str = "" company: str = "" company_profile: str = "" job_title: str = "" followers: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def crawl_profiles(name, location, data_pipeline=None, retries=3): first_name = name.split()[0] last_name = name.split()[1] url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") profile_cards = soup.find_all("div", class_="base-search-card__info") for card in profile_cards: href = card.parent.get("href").split("?")[0] name = href.split("/")[-1].split("?")[0] display_name = card.find("h3", class_="base-search-card__title").text location = card.find("p", class_="people-search-card__location").text companies = "n/a" has_companies = card.find("span", class_="entity-list-meta__entities-list") if has_companies: companies = has_companies.text search_data = SearchData( name=name, display_name=display_name, url=href, location=location, companies=companies ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( crawl_profiles, profile_list, [location] * len(profile_list), [data_pipeline] * len(profile_list), [retries] * len(profile_list) ) def scrape_profile(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code != 200: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") head = soup.find("head") script = head.select_one("script[type='application/ld+json']") json_data_graph = json.loads(script.text)["@graph"] json_data = {} person_pipeline = DataPipeline(f"{row['name']}.csv") for element in json_data_graph: if element["@type"] == "Person": json_data = element break company = "n/a" company_profile = "n/a" job_title = "n/a" if "jobTitle" in json_data.keys() and type(json_data["jobTitle"] == list) and len(json_data["jobTitle"]) > 0: job_title = json_data["jobTitle"][0] has_company = "worksFor" in json_data.keys() and len(json_data["worksFor"]) > 0 if has_company: company = json_data["worksFor"][0]["name"] has_company_url = "url" in json_data["worksFor"][0].keys() if has_company_url: company_profile = json_data["worksFor"][0]["url"] has_interactions = "interactionStatistic" in json_data.keys() followers = 0 if has_interactions: stats = json_data["interactionStatistic"] if stats["name"] == "Follows" and stats["@type"] == "InteractionCounter": followers = stats["userInteractionCount"] profile_data = ProfileData ( name=row["name"], company=company, company_profile=company_profile, job_title=job_title, followers=followers ) person_pipeline.add_data(profile_data) person_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_profile, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["bill gates", "elon musk"] ## Job Processes filename = "profile-crawl.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
https://www.linkedin.com/pub/dir?firstName=bill&lastName=gates&trk=people-guest_people-search-bar_search-submit
https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit
https://www.linkedin.com/in/williamhgates?trk=people-guest_people_search-card
div
with a class
of base-search=card__info
.country
parameter and we'll get routed through a country of our choosing. If we want to appear in the US, we can pass "country": "us"
.You can view the full use of supported countries on this page.mkdir linkedin-profiles-scraper cd linkedin-profiles-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
crawl_profiles()
.First, we find all of our div
elements. Then, we iterate through them and pull the relevant data from them.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def crawl_profiles(name, location, retries=3): first_name = name.split()[0] last_name = name.split()[1] url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") profile_cards = soup.find_all("div", class_="base-search-card__info") for card in profile_cards: href = card.parent.get("href").split("?")[0] name = href.split("/")[-1].split("?")[0] display_name = card.find("h3", class_="base-search-card__title").text location = card.find("p", class_="people-search-card__location").text companies = "n/a" has_companies = card.find("span", class_="entity-list-meta__entities-list") if has_companies: companies = has_companies.text search_data = { "name": name, "display_name": display_name, "url": href, "location": location, "companies": companies } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_crawl(profile_list, location, retries=3): for name in profile_list: crawl_profiles(name, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["bill gates", "elon musk"] ## Job Processes filename = "profile-crawl.csv" start_crawl(keyword_list, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
soup.find_all("div", class_="base-search-card__info")
gets all of our profile cards for us.card.parent.get("href").split("?")[0]
to get the link to each profile.h3
and pull the display name from it.location
from the card's p
element.span
elements to see if there are companies present and if there are companies, we extract them.dataclass
called SearchData
. Afterward, we'll create a DataPipeline
.Here is our SearchData
. We use it to represent the objects we've been scraping.@dataclassclass SearchData: name: str = "" display_name: str = "" url: str = "" location: str = "" companies: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
SearchData
, we need to store it. To do this, we'll pass it into a DataPipeline
. Our pipeline in the snippet below takes in a dataclass
and saves it to a CSV file. If the CSV already exists, we open it in append mode, otherwise we write a new one. On top of that, our DataPipeline
also has some logic for filtering out duplicates.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" display_name: str = "" url: str = "" location: str = "" companies: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def crawl_profiles(name, location, data_pipeline=None, retries=3): first_name = name.split()[0] last_name = name.split()[1] url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") profile_cards = soup.find_all("div", class_="base-search-card__info") for card in profile_cards: href = card.parent.get("href").split("?")[0] name = href.split("/")[-1].split("?")[0] display_name = card.find("h3", class_="base-search-card__title").text location = card.find("p", class_="people-search-card__location").text companies = "n/a" has_companies = card.find("span", class_="entity-list-meta__entities-list") if has_companies: companies = has_companies.text search_data = SearchData( name=name, display_name=display_name, url=href, location=location, companies=companies ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_crawl(profile_list, location, data_pipeline=None, retries=3): for name in profile_list: crawl_profiles(name, location, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["bill gates", "elon musk"] ## Job Processes filename = "profile-crawl.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
SearchData
objects.DataPipeline
.ThreadPoolExecutor
. It opens up a new thread pool with a max_threads
argument. Then, it runs a function of our choice on each available thread.Take a look at the example below.def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( crawl_profiles, profile_list, [location] * len(profile_list), [data_pipeline] * len(profile_list), [retries] * len(profile_list) )
for
loop, we open up a new thread pool and pass crawl_profiles
into it. All other arguments get passed in as arrays. ThreadPoolExecutor
takes these arrays and passes each element from each array into an individual instance of crawl_profiles
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" display_name: str = "" url: str = "" location: str = "" companies: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def crawl_profiles(name, location, data_pipeline=None, retries=3): first_name = name.split()[0] last_name = name.split()[1] url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") profile_cards = soup.find_all("div", class_="base-search-card__info") for card in profile_cards: href = card.parent.get("href").split("?")[0] name = href.split("/")[-1].split("?")[0] display_name = card.find("h3", class_="base-search-card__title").text location = card.find("p", class_="people-search-card__location").text companies = "n/a" has_companies = card.find("span", class_="entity-list-meta__entities-list") if has_companies: companies = has_companies.text search_data = SearchData( name=name, display_name=display_name, url=href, location=location, companies=companies ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( crawl_profiles, profile_list, [location] * len(profile_list), [data_pipeline] * len(profile_list), [retries] * len(profile_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["bill gates", "elon musk"] ## Job Processes filename = "profile-crawl.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
kwarg
.It then creates a payload and wraps all this information into a new URL that routes our page through the ScrapeOps Proxy API.When talking to the ScrapeOps API, we can use the country
param to choose our location. There are many other options we can use such as residential
and mobile
but typically, our country
parameter is enough.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" display_name: str = "" url: str = "" location: str = "" companies: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def crawl_profiles(name, location, data_pipeline=None, retries=3): first_name = name.split()[0] last_name = name.split()[1] url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") profile_cards = soup.find_all("div", class_="base-search-card__info") for card in profile_cards: href = card.parent.get("href").split("?")[0] name = href.split("/")[-1].split("?")[0] display_name = card.find("h3", class_="base-search-card__title").text location = card.find("p", class_="people-search-card__location").text companies = "n/a" has_companies = card.find("span", class_="entity-list-meta__entities-list") if has_companies: companies = has_companies.text search_data = SearchData( name=name, display_name=display_name, url=href, location=location, companies=companies ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( crawl_profiles, profile_list, [location] * len(profile_list), [data_pipeline] * len(profile_list), [retries] * len(profile_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["bill gates", "elon musk"] ## Job Processes filename = "profile-crawl.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
main
.You can run the script with the following command: python name_of_your_script.py
.MAX_RETRIES
MAX_THREADS
LOCATION
keyword_list
profile-crawl.csv
and then scrape each individual profile found in the crawl. We're going to use iterative building to add features, just like we did earlier.def scrape_profile(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code != 200: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") head = soup.find("head") script = head.select_one("script[type='application/ld+json']") json_data_graph = json.loads(script.text)["@graph"] json_data = {} person_pipeline = DataPipeline(f"{row['name']}.csv") for element in json_data_graph: if element["@type"] == "Person": json_data = element break company = "n/a" company_profile = "n/a" job_title = "n/a" if "jobTitle" in json_data.keys() and type(json_data["jobTitle"] == list) and len(json_data["jobTitle"]) > 0: job_title = json_data["jobTitle"][0] has_company = "worksFor" in json_data.keys() and len(json_data["worksFor"]) > 0 if has_company: company = json_data["worksFor"][0]["name"] has_company_url = "url" in json_data["worksFor"][0].keys() if has_company_url: company_profile = json_data["worksFor"][0]["url"] has_interactions = "interactionStatistic" in json_data.keys() followers = 0 if has_interactions: stats = json_data["interactionStatistic"] if stats["name"] == "Follows" and stats["@type"] == "InteractionCounter": followers = stats["userInteractionCount"] profile_data = { "name": row["name"], "company": company, "company_profile": company_profile, "job_title": job_title, "followers": followers } print(profile_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
head
of the page. The head
contains all sorts of metadata.head.select_one("script[type='application/ld+json']")
to find our JSON blob located inside the head
."@graph"
element until we find a field called "Person"
. We use this "Person"
field to extract our data.company
: the company that a person works for.company_profile
: the company's LinkedIn profile.job_title
: the person's official job title.followers
: the amount of other people following this person.process_results()
.This one is pretty simple. It reads our CSV file into an array of dict
objects. Then it runs scrape_profile()
on each profile from the array.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_profile(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" display_name: str = "" url: str = "" location: str = "" companies: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def crawl_profiles(name, location, data_pipeline=None, retries=3): first_name = name.split()[0] last_name = name.split()[1] url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") profile_cards = soup.find_all("div", class_="base-search-card__info") for card in profile_cards: href = card.parent.get("href").split("?")[0] name = href.split("/")[-1].split("?")[0] display_name = card.find("h3", class_="base-search-card__title").text location = card.find("p", class_="people-search-card__location").text companies = "n/a" has_companies = card.find("span", class_="entity-list-meta__entities-list") if has_companies: companies = has_companies.text search_data = SearchData( name=name, display_name=display_name, url=href, location=location, companies=companies ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( crawl_profiles, profile_list, [location] * len(profile_list), [data_pipeline] * len(profile_list), [retries] * len(profile_list) ) def scrape_profile(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code != 200: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") head = soup.find("head") script = head.select_one("script[type='application/ld+json']") json_data_graph = json.loads(script.text)["@graph"] json_data = {} person_pipeline = DataPipeline(f"{row['name']}.csv") for element in json_data_graph: if element["@type"] == "Person": json_data = element break company = "n/a" company_profile = "n/a" job_title = "n/a" if "jobTitle" in json_data.keys() and type(json_data["jobTitle"] == list) and len(json_data["jobTitle"]) > 0: job_title = json_data["jobTitle"][0] has_company = "worksFor" in json_data.keys() and len(json_data["worksFor"]) > 0 if has_company: company = json_data["worksFor"][0]["name"] has_company_url = "url" in json_data["worksFor"][0].keys() if has_company_url: company_profile = json_data["worksFor"][0]["url"] has_interactions = "interactionStatistic" in json_data.keys() followers = 0 if has_interactions: stats = json_data["interactionStatistic"] if stats["name"] == "Follows" and stats["@type"] == "InteractionCounter": followers = stats["userInteractionCount"] profile_data = { "name": row["name"], "company": company, "company_profile": company_profile, "job_title": job_title, "followers": followers } print(profile_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_profile(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["bill gates", "elon musk"] ## Job Processes filename = "profile-crawl.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") process_results(filename, LOCATION, retries=MAX_RETRIES)
scrape_profile()
is used to scrape data from individual profile pages.process_results()
reads our CSV file and runs scrape_profile()
on all of the profiles from our CSV.SearchData
class and a DataPipeline
. Both of these classes are technically reusable but SearchData
won't work for us. We need another dataclass
with different fields.Take a look at our new dataclass
. We'll call this one ProfileData
.@dataclassclass ProfileData: name: str = "" company: str = "" company_profile: str = "" job_title: str = "" followers: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
from within our parsing function and pass ProfileData
objects into it.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" display_name: str = "" url: str = "" location: str = "" companies: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ProfileData: name: str = "" company: str = "" company_profile: str = "" job_title: str = "" followers: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def crawl_profiles(name, location, data_pipeline=None, retries=3): first_name = name.split()[0] last_name = name.split()[1] url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") profile_cards = soup.find_all("div", class_="base-search-card__info") for card in profile_cards: href = card.parent.get("href").split("?")[0] name = href.split("/")[-1].split("?")[0] display_name = card.find("h3", class_="base-search-card__title").text location = card.find("p", class_="people-search-card__location").text companies = "n/a" has_companies = card.find("span", class_="entity-list-meta__entities-list") if has_companies: companies = has_companies.text search_data = SearchData( name=name, display_name=display_name, url=href, location=location, companies=companies ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( crawl_profiles, profile_list, [location] * len(profile_list), [data_pipeline] * len(profile_list), [retries] * len(profile_list) ) def scrape_profile(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code != 200: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") head = soup.find("head") script = head.select_one("script[type='application/ld+json']") json_data_graph = json.loads(script.text)["@graph"] json_data = {} person_pipeline = DataPipeline(f"{row['name']}.csv") for element in json_data_graph: if element["@type"] == "Person": json_data = element break company = "n/a" company_profile = "n/a" job_title = "n/a" if "jobTitle" in json_data.keys() and type(json_data["jobTitle"] == list) and len(json_data["jobTitle"]) > 0: job_title = json_data["jobTitle"][0] has_company = "worksFor" in json_data.keys() and len(json_data["worksFor"]) > 0 if has_company: company = json_data["worksFor"][0]["name"] has_company_url = "url" in json_data["worksFor"][0].keys() if has_company_url: company_profile = json_data["worksFor"][0]["url"] has_interactions = "interactionStatistic" in json_data.keys() followers = 0 if has_interactions: stats = json_data["interactionStatistic"] if stats["name"] == "Follows" and stats["@type"] == "InteractionCounter": followers = stats["userInteractionCount"] profile_data = ProfileData ( name=row["name"], company=company, company_profile=company_profile, job_title=job_title, followers=followers ) person_pipeline.add_data(profile_data) person_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_profile(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["bill gates", "elon musk"] ## Job Processes filename = "profile-crawl.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") process_results(filename, LOCATION, retries=MAX_RETRIES)
ProfileData
to represent data scraped from individual profiles.ProfileData
objects directly into a DataPipeline
just like we did with SearchData
earlier in this project.ThreadPoolExecutor
to run our parsing function. Our first argument is scrape_profile
(the function we wish to call).All other arguments to scrape_profile
get passed in as arrays, just like before when we added multithreading.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_profile, reader, [location] * len(reader), [retries] * len(reader) )
get_scrapeops_url()
. We just need to put it in the right place. We're going to change a single line from our parsing function.response = requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" display_name: str = "" url: str = "" location: str = "" companies: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ProfileData: name: str = "" company: str = "" company_profile: str = "" job_title: str = "" followers: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def crawl_profiles(name, location, data_pipeline=None, retries=3): first_name = name.split()[0] last_name = name.split()[1] url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") profile_cards = soup.find_all("div", class_="base-search-card__info") for card in profile_cards: href = card.parent.get("href").split("?")[0] name = href.split("/")[-1].split("?")[0] display_name = card.find("h3", class_="base-search-card__title").text location = card.find("p", class_="people-search-card__location").text companies = "n/a" has_companies = card.find("span", class_="entity-list-meta__entities-list") if has_companies: companies = has_companies.text search_data = SearchData( name=name, display_name=display_name, url=href, location=location, companies=companies ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( crawl_profiles, profile_list, [location] * len(profile_list), [data_pipeline] * len(profile_list), [retries] * len(profile_list) ) def scrape_profile(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code != 200: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") head = soup.find("head") script = head.select_one("script[type='application/ld+json']") json_data_graph = json.loads(script.text)["@graph"] json_data = {} person_pipeline = DataPipeline(f"{row['name']}.csv") for element in json_data_graph: if element["@type"] == "Person": json_data = element break company = "n/a" company_profile = "n/a" job_title = "n/a" if "jobTitle" in json_data.keys() and type(json_data["jobTitle"] == list) and len(json_data["jobTitle"]) > 0: job_title = json_data["jobTitle"][0] has_company = "worksFor" in json_data.keys() and len(json_data["worksFor"]) > 0 if has_company: company = json_data["worksFor"][0]["name"] has_company_url = "url" in json_data["worksFor"][0].keys() if has_company_url: company_profile = json_data["worksFor"][0]["url"] has_interactions = "interactionStatistic" in json_data.keys() followers = 0 if has_interactions: stats = json_data["interactionStatistic"] if stats["name"] == "Follows" and stats["@type"] == "InteractionCounter": followers = stats["userInteractionCount"] profile_data = ProfileData ( name=row["name"], company=company, company_profile=company_profile, job_title=job_title, followers=followers ) person_pipeline.add_data(profile_data) person_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_profile, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["bill gates", "elon musk"] ## Job Processes filename = "profile-crawl.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
MAX_THREADS
LOCATION
keyword_list
robots.txt
. You can view their terms here and you may view their robots.txt
here.It's important to note that LinkedIn has strict terms of service regarding data scraping, and scraping LinkedIn profiles without permission can lead to legal issues, including being banned from the platform.Always ensure compliance with LinkedIn's policies and consider using official APIs or getting explicit permission for large-scale data extraction.If you're unsure of your own scraper, consult an attorney.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file.{"api_key": "your-super-secret-api-key"}
.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function crawlProfiles(browser, keyword, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const firstName = keyword.split(" ")[0]; const lastName = keyword.split(" ")[1] const page = await browser.newPage(); try { const url = `https://www.linkedin.com/pub/dir?firstName=${firstName}&lastName=${lastName}&trk=people-guest_people-search-bar_search-submit`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const link = await page.evaluate(element => element.parentElement.getAttribute("href"), divCard); const splitLink = link.split("/") const name = splitLink[splitLink.length-1].split("?")[0]; const displayNameElement = await divCard.$("h3[class='base-search-card__title']"); const displayName = await page.evaluate(element => element.textContent, displayNameElement); const locationElement = await page.$("p[class='people-search-card__location']"); const location = await page.evaluate(element => element.textContent, locationElement); let companies = "n/a"; const hasCompanies = await page.$("span[class='entity-list-meta__entities-list']"); if (hasCompanies) { companies = await page.evaluate(element => element.textContent, hasCompanies); } const searchData = { name: name.trim(), display_name: displayName.trim(), url: link.trim(), location: location.trim(), companies: companies.trim() }; console.log(searchData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keywordList, location, retries) { const browser = await puppeteer.launch(); for (const keyword of keywordList) { await crawlProfiles(browser, keyword, location, retries); } await browser.close();} async function main() { const keywords = ["bill gates", "elon musk"]; const concurrencyLimit = 5; const location = "us"; const retries = 3; const aggregateFiles = []; console.log("Crawl starting"); console.time("startCrawl"); for (const keyword of keywords) { aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } await startCrawl(keywords, location, retries); console.timeEnd("startCrawl"); console.log("Crawl complete");} main();
main
:keywords
concurrencyLimit
location
retries
https://www.linkedin.com/pub/dir?firstName=bill&lastName=gates&trk=people-guest_people-search-bar_search-submit
https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit
https://www.linkedin.com/in/williamhgates?trk=people-guest_people_search-card
https://www.linkedin.com/in/{name_of_profile}
div
with a class
of 'base-search-card__info'
. For individual profiles, we pull our data from a JSON blob inside the head
of the page.Look at each result. It's div
element. Its class
is base-search=card__info
.country
parameter. ScrapeOps then reads this parameter and routes our request through the corresponding country."country": "us"
."country": "uk"
.mkdir linkedin-profiles-scraper cd linkedin-profiles-scraper
npm init --y
npm install puppeteer
npm install csv-writer
npm install csv-parse
npm install fs
crawlProfiles()
.As we discovered earlier, we need find all of our target div
elements. Once we've got them, we'll iterate through them with a for
loop and extract their data.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function crawlProfiles(browser, keyword, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const firstName = keyword.split(" ")[0]; const lastName = keyword.split(" ")[1] const page = await browser.newPage(); try { const url = `https://www.linkedin.com/pub/dir?firstName=${firstName}&lastName=${lastName}&trk=people-guest_people-search-bar_search-submit`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const link = await page.evaluate(element => element.parentElement.getAttribute("href"), divCard); const splitLink = link.split("/") const name = splitLink[splitLink.length-1].split("?")[0]; const displayNameElement = await divCard.$("h3[class='base-search-card__title']"); const displayName = await page.evaluate(element => element.textContent, displayNameElement); const locationElement = await page.$("p[class='people-search-card__location']"); const location = await page.evaluate(element => element.textContent, locationElement); let companies = "n/a"; const hasCompanies = await page.$("span[class='entity-list-meta__entities-list']"); if (hasCompanies) { companies = await page.evaluate(element => element.textContent, hasCompanies); } const searchData = { name: name.trim(), display_name: displayName.trim(), url: link.trim(), location: location.trim(), companies: companies.trim() }; console.log(searchData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keywordList, location, retries) { const browser = await puppeteer.launch(); for (const keyword of keywordList) { await crawlProfiles(browser, keyword, location, retries); } await browser.close();} async function main() { const keywords = ["bill gates", "elon musk"]; const concurrencyLimit = 5; const location = "us"; const retries = 3; const aggregateFiles = []; console.log("Crawl starting"); console.time("startCrawl"); for (const keyword of keywords) { aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } await startCrawl(keywords, location, retries); console.timeEnd("startCrawl"); console.log("Crawl complete");} main();
await page.$$("div[class='base-search-card__info']");
returns all of the profile cards we're looking for.await page.evaluate(element => element.parentElement.getAttribute("href"), divCard)
finds our link
.await divCard.$("h3[class='base-search-card__title']")
yields our displayNameElement
.
await page.evaluate(element => element.textContent, displayNameElement)
.await page.$("p[class='people-search-card__location']")
gives us the locationElement
.
displayNameElement
.span
elements to see if there are companies present and if there are companies, we extract them. If there are no companies, we assign a default value of "n/a"
.writetoCsv()
.async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function crawlProfiles(browser, keyword, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const firstName = keyword.split(" ")[0]; const lastName = keyword.split(" ")[1] const page = await browser.newPage(); try { const url = `https://www.linkedin.com/pub/dir?firstName=${firstName}&lastName=${lastName}&trk=people-guest_people-search-bar_search-submit`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const link = await page.evaluate(element => element.parentElement.getAttribute("href"), divCard); const splitLink = link.split("/") const name = splitLink[splitLink.length-1].split("?")[0]; const displayNameElement = await divCard.$("h3[class='base-search-card__title']"); const displayName = await page.evaluate(element => element.textContent, displayNameElement); const locationElement = await page.$("p[class='people-search-card__location']"); const location = await page.evaluate(element => element.textContent, locationElement); let companies = "n/a"; const hasCompanies = await page.$("span[class='entity-list-meta__entities-list']"); if (hasCompanies) { companies = await page.evaluate(element => element.textContent, hasCompanies); } const searchData = { name: name.trim(), display_name: displayName.trim(), url: link.trim(), location: location.trim(), companies: companies.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keywordList, location, retries) { const browser = await puppeteer.launch(); for (const keyword of keywordList) { await crawlProfiles(browser, keyword, location, retries); } await browser.close();} async function main() { const keywords = ["bill gates", "elon musk"]; const concurrencyLimit = 5; const location = "us"; const retries = 3; const aggregateFiles = []; console.log("Crawl starting"); console.time("startCrawl"); for (const keyword of keywords) { aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } await startCrawl(keywords, location, retries); console.timeEnd("startCrawl"); console.log("Crawl complete");} main();
searchData
object.searchData
into writeToCsv()
and store it to a CSV file.async
support. Take a look at the example below.async function startCrawl(keywordList, location, concurrencyLimit, retries) { const browser = await puppeteer.launch(); while (keywordList.length > 0) { const currentBatch = keywordList.splice(0, concurrencyLimit); const tasks = currentBatch.map(keyword => crawlProfiles(browser, keyword, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();}
for
loop. Instead, we create a list of async
tasks and we use Promise.all()
to wait for them all to resolve.When we search for bill gates and elon musk, both of these pages get fetched and parsed concurrently. We wait from the both to resolve before closing the browser and exiting the function.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function crawlProfiles(browser, keyword, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const firstName = keyword.split(" ")[0]; const lastName = keyword.split(" ")[1] const page = await browser.newPage(); try { const url = `https://www.linkedin.com/pub/dir?firstName=${firstName}&lastName=${lastName}&trk=people-guest_people-search-bar_search-submit`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const link = await page.evaluate(element => element.parentElement.getAttribute("href"), divCard); const splitLink = link.split("/") const name = splitLink[splitLink.length-1].split("?")[0]; const displayNameElement = await divCard.$("h3[class='base-search-card__title']"); const displayName = await page.evaluate(element => element.textContent, displayNameElement); const locationElement = await page.$("p[class='people-search-card__location']"); const location = await page.evaluate(element => element.textContent, locationElement); let companies = "n/a"; const hasCompanies = await page.$("span[class='entity-list-meta__entities-list']"); if (hasCompanies) { companies = await page.evaluate(element => element.textContent, hasCompanies); } const searchData = { name: name.trim(), display_name: displayName.trim(), url: link.trim(), location: location.trim(), companies: companies.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keywordList, location, concurrencyLimit, retries) { const browser = await puppeteer.launch(); while (keywordList.length > 0) { const currentBatch = keywordList.splice(0, concurrencyLimit); const tasks = currentBatch.map(keyword => crawlProfiles(browser, keyword, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ["bill gates", "elon musk"]; const concurrencyLimit = 5; const location = "us"; const retries = 3; const aggregateFiles = []; console.log("Crawl starting"); console.time("startCrawl"); for (const keyword of keywords) { aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } await startCrawl(keywords, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete");} main();
api_key
, and location
using some URL encoding. Then it'll return a new ScrapeOps proxied URL.When we talk to the ScrapeOps API, the country
param tells ScrapeOps our location of choice. ScrapeOps then routes us through a server based in that location.There are many other options we can use such as residential
and mobile
but typically, our country
parameter is enough.function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function crawlProfiles(browser, keyword, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const firstName = keyword.split(" ")[0]; const lastName = keyword.split(" ")[1] const page = await browser.newPage(); try { const url = `https://www.linkedin.com/pub/dir?firstName=${firstName}&lastName=${lastName}&trk=people-guest_people-search-bar_search-submit`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const link = await page.evaluate(element => element.parentElement.getAttribute("href"), divCard); const splitLink = link.split("/") const name = splitLink[splitLink.length-1].split("?")[0]; const displayNameElement = await divCard.$("h3[class='base-search-card__title']"); const displayName = await page.evaluate(element => element.textContent, displayNameElement); const locationElement = await page.$("p[class='people-search-card__location']"); const location = await page.evaluate(element => element.textContent, locationElement); let companies = "n/a"; const hasCompanies = await page.$("span[class='entity-list-meta__entities-list']"); if (hasCompanies) { companies = await page.evaluate(element => element.textContent, hasCompanies); } const searchData = { name: name.trim(), display_name: displayName.trim(), url: link.trim(), location: location.trim(), companies: companies.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keywordList, location, concurrencyLimit, retries) { const browser = await puppeteer.launch(); while (keywordList.length > 0) { const currentBatch = keywordList.splice(0, concurrencyLimit); const tasks = currentBatch.map(keyword => crawlProfiles(browser, keyword, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ["bill gates", "elon musk"]; const concurrencyLimit = 5; const location = "us"; const retries = 3; const aggregateFiles = []; console.log("Crawl starting"); console.time("startCrawl"); for (const keyword of keywords) { aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } await startCrawl(keywords, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete");} main();
main()
function.keywords
concurrencyLimit
location
retries
scrape_profile()
fetches a profile. We find the head
of the page. From inside the head
, we find the JSON blob that contains all of our profile_data
.async function processProfile(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(url); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const head = await page.$("head"); const scriptElement = await head.$("script[type='application/ld+json']"); const jsonText = await page.evaluate(element => element.textContent, scriptElement); const jsonDataGraph = JSON.parse(jsonText)["@graph"]; let jsonData = {}; for (const element of jsonDataGraph) { if (element["@type"] === "Person") { jsonData = element; break; } } let company = "n/a"; let companyProfile = "n/a"; let jobTitle = "n/a"; if ("jobTitle" in jsonData && Array.isArray(jsonData.jobTitle) && jsonData.jobTitle.length > 0) { jobTitle = jsonData.jobTitle[0]; } const hasCompany = "worksFor" in jsonData && jsonData.worksFor.length > 0; if (hasCompany) { company = jsonData.worksFor[0].name; const hasCompanyUrl = "url" in jsonData.worksFor[0]; if (hasCompanyUrl) { companyProfile = jsonData.worksFor[0].url } } const hasInteractions = "interactionStatistic" in jsonData; let followers = 0; if (hasInteractions) { const stats = jsonData.interactionStatistic; if (stats.name === "Follows" && stats["@type"] === "InteractionCounter") { followers = stats.userInteractionCount; } } const profileData = { name: row.name, company: company, company_profile: companyProfile, job_title: jobTitle, followers: followers } console.log(profileData); success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } }
head
of the page: await page.$("head")
.await head.$("script[type='application/ld+json']")
finds the JSON blob inside the head
."@graph"
element until we find a field called "Person"
. We use this "Person"
field to extract our data.company
: the company that a person works for.company_profile
: the company's LinkedIn profile.job_title
: the person's official job title.followers
: the amount of other people following this person.processResults()
. The goal here is simple: read our CSV file into an array of JSON objects. Then, run processProfile()
on each profile from the array.We set this function up alot like the startCrawl()
function from earlier. You might notice that we take a concurrencyLimit
as one of our arguments.We don't do anything with it now, but we'll use it when we add concurrency later.async function processResults(csvFile, location, concurrencyLimit, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; for (const row of rows) { await processProfile(browser, row, location, retries); } await browser.close(); }
async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function crawlProfiles(browser, keyword, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const firstName = keyword.split(" ")[0]; const lastName = keyword.split(" ")[1] const page = await browser.newPage(); try { const url = `https://www.linkedin.com/pub/dir?firstName=${firstName}&lastName=${lastName}&trk=people-guest_people-search-bar_search-submit`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const link = await page.evaluate(element => element.parentElement.getAttribute("href"), divCard); const splitLink = link.split("/") const name = splitLink[splitLink.length-1].split("?")[0]; const displayNameElement = await divCard.$("h3[class='base-search-card__title']"); const displayName = await page.evaluate(element => element.textContent, displayNameElement); const locationElement = await page.$("p[class='people-search-card__location']"); const location = await page.evaluate(element => element.textContent, locationElement); let companies = "n/a"; const hasCompanies = await page.$("span[class='entity-list-meta__entities-list']"); if (hasCompanies) { companies = await page.evaluate(element => element.textContent, hasCompanies); } const searchData = { name: name.trim(), display_name: displayName.trim(), url: link.trim(), location: location.trim(), companies: companies.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keywordList, location, concurrencyLimit, retries) { const browser = await puppeteer.launch(); while (keywordList.length > 0) { const currentBatch = keywordList.splice(0, concurrencyLimit); const tasks = currentBatch.map(keyword => crawlProfiles(browser, keyword, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processProfile(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(url); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const head = await page.$("head"); const scriptElement = await head.$("script[type='application/ld+json']"); const jsonText = await page.evaluate(element => element.textContent, scriptElement); const jsonDataGraph = JSON.parse(jsonText)["@graph"]; let jsonData = {}; for (const element of jsonDataGraph) { if (element["@type"] === "Person") { jsonData = element; break; } } let company = "n/a"; let companyProfile = "n/a"; let jobTitle = "n/a"; if ("jobTitle" in jsonData && Array.isArray(jsonData.jobTitle) && jsonData.jobTitle.length > 0) { jobTitle = jsonData.jobTitle[0]; } const hasCompany = "worksFor" in jsonData && jsonData.worksFor.length > 0; if (hasCompany) { company = jsonData.worksFor[0].name; const hasCompanyUrl = "url" in jsonData.worksFor[0]; if (hasCompanyUrl) { companyProfile = jsonData.worksFor[0].url } } const hasInteractions = "interactionStatistic" in jsonData; let followers = 0; if (hasInteractions) { const stats = jsonData.interactionStatistic; if (stats.name === "Follows" && stats["@type"] === "InteractionCounter") { followers = stats.userInteractionCount; } } const profileData = { name: row.name, company: company, company_profile: companyProfile, job_title: jobTitle, followers: followers } console.log(profileData); success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; for (const row of rows) { await processProfile(browser, row, location, retries); } await browser.close(); } async function main() { const keywords = ["bill gates", "elon musk"]; const concurrencyLimit = 5; const location = "us"; const retries = 3; const aggregateFiles = []; console.log("Crawl starting"); console.time("startCrawl"); for (const keyword of keywords) { aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } await startCrawl(keywords, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); console.log("Starting scrape"); for (const file of aggregateFiles) { console.log(file) console.time("processResults"); await processResults(file, location, concurrencyLimit, retries); console.timeEnd("processResults"); } console.log("Scrape complete");} main();
processProfile()
extracts data from individual profiles.processResults()
reads our CSV file and runs processProfile()
on all of the profiles from our CSV.writeToCsv()
already gives us the ability to write JSON objects to a CSV file. We also already convert our extracted data into a JSON object.Instead of printing our JSON object to the console, we need to pass it into writeToCsv()
. That's the only line that changes here.Here's our fully updated code.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function crawlProfiles(browser, keyword, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const firstName = keyword.split(" ")[0]; const lastName = keyword.split(" ")[1] const page = await browser.newPage(); try { const url = `https://www.linkedin.com/pub/dir?firstName=${firstName}&lastName=${lastName}&trk=people-guest_people-search-bar_search-submit`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const link = await page.evaluate(element => element.parentElement.getAttribute("href"), divCard); const splitLink = link.split("/") const name = splitLink[splitLink.length-1].split("?")[0]; const displayNameElement = await divCard.$("h3[class='base-search-card__title']"); const displayName = await page.evaluate(element => element.textContent, displayNameElement); const locationElement = await page.$("p[class='people-search-card__location']"); const location = await page.evaluate(element => element.textContent, locationElement); let companies = "n/a"; const hasCompanies = await page.$("span[class='entity-list-meta__entities-list']"); if (hasCompanies) { companies = await page.evaluate(element => element.textContent, hasCompanies); } const searchData = { name: name.trim(), display_name: displayName.trim(), url: link.trim(), location: location.trim(), companies: companies.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keywordList, location, concurrencyLimit, retries) { const browser = await puppeteer.launch(); while (keywordList.length > 0) { const currentBatch = keywordList.splice(0, concurrencyLimit); const tasks = currentBatch.map(keyword => crawlProfiles(browser, keyword, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processProfile(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(url); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const head = await page.$("head"); const scriptElement = await head.$("script[type='application/ld+json']"); const jsonText = await page.evaluate(element => element.textContent, scriptElement); const jsonDataGraph = JSON.parse(jsonText)["@graph"]; let jsonData = {}; for (const element of jsonDataGraph) { if (element["@type"] === "Person") { jsonData = element; break; } } let company = "n/a"; let companyProfile = "n/a"; let jobTitle = "n/a"; if ("jobTitle" in jsonData && Array.isArray(jsonData.jobTitle) && jsonData.jobTitle.length > 0) { jobTitle = jsonData.jobTitle[0]; } const hasCompany = "worksFor" in jsonData && jsonData.worksFor.length > 0; if (hasCompany) { company = jsonData.worksFor[0].name; const hasCompanyUrl = "url" in jsonData.worksFor[0]; if (hasCompanyUrl) { companyProfile = jsonData.worksFor[0].url } } const hasInteractions = "interactionStatistic" in jsonData; let followers = 0; if (hasInteractions) { const stats = jsonData.interactionStatistic; if (stats.name === "Follows" && stats["@type"] === "InteractionCounter") { followers = stats.userInteractionCount; } } const profileData = { name: row.name, company: company, company_profile: companyProfile, job_title: jobTitle, followers: followers } await writeToCsv([profileData], `${row.name.replace(" ", "-")}.csv`); success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; for (const row of rows) { await processProfile(browser, row, location, retries); } await browser.close(); } async function main() { const keywords = ["bill gates", "elon musk"]; const concurrencyLimit = 5; const location = "us"; const retries = 3; const aggregateFiles = []; console.log("Crawl starting"); console.time("startCrawl"); for (const keyword of keywords) { aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } await startCrawl(keywords, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); console.log("Starting scrape"); for (const file of aggregateFiles) { console.log(file) console.time("processResults"); await processResults(file, location, concurrencyLimit, retries); console.timeEnd("processResults"); } console.log("Scrape complete");} main();
profileData
into writeToCsv()
. This stores our extracted data safely.concurrencyLimit
before?Now it's time to actually use it. Here, we'll once again use splice()
to cut our array into chunks.We convert each chunk into an array of async
tasks. Then we await
our tasks
using Promise.all()
so each task can resolve.async function processResults(csvFile, location, concurrencyLimit, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; while (rows.length > 0) { const currentBatch = rows.splice(0, concurrencyLimit); const tasks = currentBatch.map(row => processProfile(browser, row, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); }
getScrapeOpsUrl()
. We need it to get past any anti-bots they use on the profile pages as well.We'll going to change one line from our parsing function, await page.goto()
.const response = await page.goto(getScrapeOpsUrl(url, location), { timeout: 0 });
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = JSON.parse(fs.readFileSync("config.json")).api_key; async function writeToCsv(data, outputFile) { let success = false; while (!success) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); if (!(data instanceof Array)) { data = [data] } const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); success = true; } catch (e) { console.log("Failed data", data); throw new Error("Failed to write to csv"); } }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function crawlProfiles(browser, keyword, location="us", retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const firstName = keyword.split(" ")[0]; const lastName = keyword.split(" ")[1] const page = await browser.newPage(); try { const url = `https://www.linkedin.com/pub/dir?firstName=${firstName}&lastName=${lastName}&trk=people-guest_people-search-bar_search-submit`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl, { timeout: 0 }); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[class='base-search-card__info']"); for (const divCard of divCards) { const link = await page.evaluate(element => element.parentElement.getAttribute("href"), divCard); const splitLink = link.split("/") const name = splitLink[splitLink.length-1].split("?")[0]; const displayNameElement = await divCard.$("h3[class='base-search-card__title']"); const displayName = await page.evaluate(element => element.textContent, displayNameElement); const locationElement = await page.$("p[class='people-search-card__location']"); const location = await page.evaluate(element => element.textContent, locationElement); let companies = "n/a"; const hasCompanies = await page.$("span[class='entity-list-meta__entities-list']"); if (hasCompanies) { companies = await page.evaluate(element => element.textContent, hasCompanies); } const searchData = { name: name.trim(), display_name: displayName.trim(), url: link.trim(), location: location.trim(), companies: companies.trim() }; await writeToCsv([searchData], `${keyword.replace(" ", "-")}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startCrawl(keywordList, location, concurrencyLimit, retries) { const browser = await puppeteer.launch(); while (keywordList.length > 0) { const currentBatch = keywordList.splice(0, concurrencyLimit); const tasks = currentBatch.map(keyword => crawlProfiles(browser, keyword, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function processProfile(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { const response = await page.goto(getScrapeOpsUrl(url, location), { timeout: 0 }); if (!response || response.status() !== 200) { throw new Error("Failed to fetch page, status:", response.status()); } const head = await page.$("head"); const scriptElement = await head.$("script[type='application/ld+json']"); const jsonText = await page.evaluate(element => element.textContent, scriptElement); const jsonDataGraph = JSON.parse(jsonText)["@graph"]; let jsonData = {}; for (const element of jsonDataGraph) { if (element["@type"] === "Person") { jsonData = element; break; } } let company = "n/a"; let companyProfile = "n/a"; let jobTitle = "n/a"; if ("jobTitle" in jsonData && Array.isArray(jsonData.jobTitle) && jsonData.jobTitle.length > 0) { jobTitle = jsonData.jobTitle[0]; } const hasCompany = "worksFor" in jsonData && jsonData.worksFor.length > 0; if (hasCompany) { company = jsonData.worksFor[0].name; const hasCompanyUrl = "url" in jsonData.worksFor[0]; if (hasCompanyUrl) { companyProfile = jsonData.worksFor[0].url } } const hasInteractions = "interactionStatistic" in jsonData; let followers = 0; if (hasInteractions) { const stats = jsonData.interactionStatistic; if (stats.name === "Follows" && stats["@type"] === "InteractionCounter") { followers = stats.userInteractionCount; } } const profileData = { name: row.name, company: company, company_profile: companyProfile, job_title: jobTitle, followers: followers } await writeToCsv([profileData], `${row.name.replace(" ", "-")}.csv`); success = true; console.log("Successfully parsed", row.url); } catch (err) { tries++; console.log(`Error: ${err}, tries left: ${retries-tries}, url: ${getScrapeOpsUrl(url)}`); } finally { await page.close(); } } } async function processResults(csvFile, location, concurrencyLimit, retries) { const rows = await readCsv(csvFile); const browser = await puppeteer.launch();; while (rows.length > 0) { const currentBatch = rows.splice(0, concurrencyLimit); const tasks = currentBatch.map(row => processProfile(browser, row, location, retries)); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close(); } async function main() { const keywords = ["bill gates", "elon musk"]; const concurrencyLimit = 5; const location = "us"; const retries = 3; const aggregateFiles = []; console.log("Crawl starting"); console.time("startCrawl"); for (const keyword of keywords) { aggregateFiles.push(`${keyword.replace(" ", "-")}.csv`); } await startCrawl(keywords, location, concurrencyLimit, retries); console.timeEnd("startCrawl"); console.log("Crawl complete"); console.log("Starting scrape"); for (const file of aggregateFiles) { console.log(file) console.time("processResults"); await processResults(file, location, concurrencyLimit, retries); console.timeEnd("processResults"); } console.log("Scrape complete");} main();
keywords
concurrencyLimit
location
retries
robots.txt
. You can view their terms here and you may view their robots.txt
here.Failure to comply with these policies can result in suspension or even permanent removal of your LinkedIn account.If you're unsure whether your scraper is legal or not, consult an attorney.