Skip to main content

Scrape Linkedin Profiles With Python Requests and BeautifulSoup

How to Scrape LinkedIn Profiles With Requests and BeautifulSoup

LinkedIn is a social network for professionals. Scraping LinkedIn profiles can be a valuable technique for gathering professional data for research, recruitment, or networking purposes. When scraping LinkedIn profiles, you can typically access data which provides a comprehensive overview of a user's professional background, expertise, and network.

In this guide, we'll explore how to scrape LinkedIn profiles effectively.


TLDR - How to Scrape LinkedIn Profiles

If you're looking to scrape LinkedIn Profiles, take a look at our pre-built scraper below.

  1. It first performs a crawl and generates a report.
  2. It then reads the crawl report and scrapes individual data from each profile we found during the crawl.

First, create a new project folder with a config.json file. Inside the config file, add your ScrapeOps API key, {"api_key": "your-super-secret-api-key"}.

Then, copy and paste the code below into a Python file.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
display_name: str = ""
url: str = ""
location: str = ""
companies: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ProfileData:
name: str = ""
company: str = ""
company_profile: str = ""
job_title: str = ""
followers: int = 0



def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def crawl_profiles(name, location, data_pipeline=None, retries=3):
first_name = name.split()[0]
last_name = name.split()[1]
url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")



soup = BeautifulSoup(response.text, "html.parser")
profile_cards = soup.find_all("div", class_="base-search-card__info")
for card in profile_cards:
href = card.parent.get("href").split("?")[0]
name = href.split("/")[-1].split("?")[0]
display_name = card.find("h3", class_="base-search-card__title").text
location = card.find("p", class_="people-search-card__location").text
companies = "n/a"
has_companies = card.find("span", class_="entity-list-meta__entities-list")
if has_companies:
companies = has_companies.text

search_data = SearchData(
name=name,
display_name=display_name,
url=href,
location=location,
companies=companies
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
crawl_profiles,
profile_list,
[location] * len(profile_list),
[data_pipeline] * len(profile_list),
[retries] * len(profile_list)
)


def scrape_profile(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code != 200:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")
script = head.select_one("script[type='application/ld+json']")
json_data_graph = json.loads(script.text)["@graph"]
json_data = {}
person_pipeline = DataPipeline(f"{row['name']}.csv")
for element in json_data_graph:
if element["@type"] == "Person":
json_data = element
break

company = "n/a"
company_profile = "n/a"
job_title = "n/a"

if "jobTitle" in json_data.keys() and type(json_data["jobTitle"] == list) and len(json_data["jobTitle"]) > 0:
job_title = json_data["jobTitle"][0]

has_company = "worksFor" in json_data.keys() and len(json_data["worksFor"]) > 0
if has_company:
company = json_data["worksFor"][0]["name"]
has_company_url = "url" in json_data["worksFor"][0].keys()
if has_company_url:
company_profile = json_data["worksFor"][0]["url"]

has_interactions = "interactionStatistic" in json_data.keys()
followers = 0
if has_interactions:
stats = json_data["interactionStatistic"]
if stats["name"] == "Follows" and stats["@type"] == "InteractionCounter":
followers = stats["userInteractionCount"]

profile_data = ProfileData (
name=row["name"],
company=company,
company_profile=company_profile,
job_title=job_title,
followers=followers
)
person_pipeline.add_data(profile_data)
person_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_profile,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["bill gates", "elon musk"]

## Job Processes
filename = "profile-crawl.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

How To Architect Our LinkedIn Profiles Scraper

Scraping LinkedIn is a tricky job. LinkedIn is notoriously hard to scrape because of the anti-bot protocols they put in place. However, with some due diligence, we can get around all of that.

We're going to build a profile crawler and a profile scraper.

  • Our crawler takes in a keyword and searches for it. For instance, if we want to look up Bill Gates, our crawler will perform that search and save each Bill Gates that it finds in the results.
  • Our scraper will then look at all these saved profiles and look them up individually to find things on their profile pages.

At a high level, our profile crawler needs to:

  1. Perform a search and parse the search results.
  2. Store those parsed results.
  3. Concurrently run steps 1 and 2 on multiple searches.
  4. Use proxy integration to get past LinkedIn's anti-bots.

Our profile scraper needs to perform these steps:

  1. Read the crawler's report into an array.
  2. Parse a row from the array.
  3. Store parsed profile data.
  4. Run steps 2 and 3 on multiple pages concurrently.
  5. Utilize a proxy to bypass anti-bots.

Understanding How To Scrape LinkedIn Profiles

Now, we're going to get a feel for the webpages we're looking at. After we know what our pages look like, we're going to see where their data is located.

We'll also use the ScrapeOps Proxy API to handle our geolocation. These next few steps allow us to properly plan out our program before writing it.


Step 1: How To Request LinkedIn Profiles Pages

First, we need to learn how to GET LinkedIn profile pages. There are two pages we need to GET, the search results and the individual profile page. Check out the images below for a better understanding of these types of pages.

Below is a search for Bill Gates. Our URL is:

https://www.linkedin.com/pub/dir?firstName=bill&lastName=gates&trk=people-guest_people-search-bar_search-submit

As you can see, we're prompted to sign in as soon as we get to the page, but this isn't really an issue because our full page is still in tact under the prompt.

Our final URL format looks like this

https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit

LinkedIn Search Results

Next, let's take a look how our individual profiles are laid out. Here's a look at the profile of Bill Gates. While we're once again prompted to sign in, the page is in tact. Our URL is:

https://www.linkedin.com/in/williamhgates?trk=people-guest_people_search-card

When we reconstruct these links, they'll be:

We remove the queries at the end because (for some reason), anti-bots are less likely to block us when we format the URL this way.

Bill Gates LinkedIn Profile


Step 2: How To Extract Data From LinkedIn Profiles Results and Pages

Now let's look at the data we're going to extract. On the search results page, we get our data strictly from the HTML on the page. For individual profile pages, we get it from a JSON blob inside the page.

In the image below, you can see that each search cards has its data embedded inside a div with a class of base-search=card__info.

HTML Inspection LinkedIn Search Results Page

In our next image, you can see the JSON blob from the profile page.

HTML Inspection LinkedIn Profile Page


Step 3: Geolocated Data

In order to handle geolocation, we'll use the ScrapeOps Proxy API. The ScrapeOps API gives us the option to pass a country parameter and we'll get routed through a country of our choosing. If we want to appear in the US, we can pass "country": "us".

You can view the full use of supported countries on this page.


Setting Up Our LinkedIn Profiles Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir linkedin-profiles-scraper

cd linkedin-profiles-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A LinkedIn Profiles Search Crawler

Now that we know what we're doing, we can go through and actually build our scraping project. We'll start by building a crawler. As previously mentioned, our crawler needs to perform a search, parse the results and then store the data.

On top of all that, it needs to be able to run multiple searches concurrently and it needs to integrate with a proxy.


Step 1: Create Simple Search Data Parser

To start, we're going to build a basic parser. In this iteration of our script, we'll add error handling, retry logic, and a basic parsing function. This gives us the initial scaffolding that we need to build everything else. Take a look at the parsing function in this script, crawl_profiles().

First, we find all of our div elements. Then, we iterate through them and pull the relevant data from them.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



def crawl_profiles(name, location, retries=3):
first_name = name.split()[0]
last_name = name.split()[1]
url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")



soup = BeautifulSoup(response.text, "html.parser")
profile_cards = soup.find_all("div", class_="base-search-card__info")
for card in profile_cards:
href = card.parent.get("href").split("?")[0]
name = href.split("/")[-1].split("?")[0]
display_name = card.find("h3", class_="base-search-card__title").text
location = card.find("p", class_="people-search-card__location").text
companies = "n/a"
has_companies = card.find("span", class_="entity-list-meta__entities-list")
if has_companies:
companies = has_companies.text

search_data = {
"name": name,
"display_name": display_name,
"url": href,
"location": location,
"companies": companies
}
print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_crawl(profile_list, location, retries=3):
for name in profile_list:
crawl_profiles(name, location, retries=retries)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["bill gates", "elon musk"]

## Job Processes
filename = "profile-crawl.csv"
start_crawl(keyword_list, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
  • soup.find_all("div", class_="base-search-card__info") gets all of our profile cards for us.
  • As we iterate through the profile cards:
    • We use card.parent.get("href").split("?")[0] to get the link to each profile.
    • Our profile name gets extracted from the link.
    • We find the h3 and pull the display name from it.
    • We pull the location from the card's p element.
    • We check the span elements to see if there are companies present and if there are companies, we extract them.

Step 2: Storing the Scraped Data

Now that we're extracting our data, we need to store it properly. To accomplish this, we're going to write two different classes. First, we'll make a dataclass called SearchData. Afterward, we'll create a DataPipeline.

Here is our SearchData. We use it to represent the objects we've been scraping.

@dataclass
class SearchData:
name: str = ""
display_name: str = ""
url: str = ""
location: str = ""
companies: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Once we've got our SearchData, we need to store it. To do this, we'll pass it into a DataPipeline. Our pipeline in the snippet below takes in a dataclass and saves it to a CSV file. If the CSV already exists, we open it in append mode, otherwise we write a new one. On top of that, our DataPipeline also has some logic for filtering out duplicates.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

With these two classes added in, our code now looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
display_name: str = ""
url: str = ""
location: str = ""
companies: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def crawl_profiles(name, location, data_pipeline=None, retries=3):
first_name = name.split()[0]
last_name = name.split()[1]
url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")



soup = BeautifulSoup(response.text, "html.parser")
profile_cards = soup.find_all("div", class_="base-search-card__info")
for card in profile_cards:
href = card.parent.get("href").split("?")[0]
name = href.split("/")[-1].split("?")[0]
display_name = card.find("h3", class_="base-search-card__title").text
location = card.find("p", class_="people-search-card__location").text
companies = "n/a"
has_companies = card.find("span", class_="entity-list-meta__entities-list")
if has_companies:
companies = has_companies.text

search_data = SearchData(
name=name,
display_name=display_name,
url=href,
location=location,
companies=companies
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_crawl(profile_list, location, data_pipeline=None, retries=3):
for name in profile_list:
crawl_profiles(name, location, data_pipeline=data_pipeline, retries=retries)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["bill gates", "elon musk"]

## Job Processes
filename = "profile-crawl.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
  • We use our scraped data to create SearchData objects.
  • We pass all of these objects into a DataPipeline.

Step 3: Adding Concurrency

Our crawler should be able to handle multiple searches simultaneously.

In order to handle this, we'll make use of ThreadPoolExecutor. It opens up a new thread pool with a max_threads argument. Then, it runs a function of our choice on each available thread.

Take a look at the example below.

def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
crawl_profiles,
profile_list,
[location] * len(profile_list),
[data_pipeline] * len(profile_list),
[retries] * len(profile_list)
)

Instead of a for loop, we open up a new thread pool and pass crawl_profiles into it. All other arguments get passed in as arrays. ThreadPoolExecutor takes these arrays and passes each element from each array into an individual instance of crawl_profiles.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
display_name: str = ""
url: str = ""
location: str = ""
companies: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def crawl_profiles(name, location, data_pipeline=None, retries=3):
first_name = name.split()[0]
last_name = name.split()[1]
url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")



soup = BeautifulSoup(response.text, "html.parser")
profile_cards = soup.find_all("div", class_="base-search-card__info")
for card in profile_cards:
href = card.parent.get("href").split("?")[0]
name = href.split("/")[-1].split("?")[0]
display_name = card.find("h3", class_="base-search-card__title").text
location = card.find("p", class_="people-search-card__location").text
companies = "n/a"
has_companies = card.find("span", class_="entity-list-meta__entities-list")
if has_companies:
companies = has_companies.text

search_data = SearchData(
name=name,
display_name=display_name,
url=href,
location=location,
companies=companies
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
crawl_profiles,
profile_list,
[location] * len(profile_list),
[data_pipeline] * len(profile_list),
[retries] * len(profile_list)
)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["bill gates", "elon musk"]

## Job Processes
filename = "profile-crawl.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

Step 5: Bypassing Anti-Bots

Bypassing anti-bots is pretty straightforward. Here, we're going to write a magical function that takes a URL as an argument and a location as a kwarg.

It then creates a payload and wraps all this information into a new URL that routes our page through the ScrapeOps Proxy API.

When talking to the ScrapeOps API, we can use the country param to choose our location. There are many other options we can use such as residential and mobile but typically, our country parameter is enough.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

Here is the code for our crawler now that it's ready for production.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
display_name: str = ""
url: str = ""
location: str = ""
companies: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def crawl_profiles(name, location, data_pipeline=None, retries=3):
first_name = name.split()[0]
last_name = name.split()[1]
url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")



soup = BeautifulSoup(response.text, "html.parser")
profile_cards = soup.find_all("div", class_="base-search-card__info")
for card in profile_cards:
href = card.parent.get("href").split("?")[0]
name = href.split("/")[-1].split("?")[0]
display_name = card.find("h3", class_="base-search-card__title").text
location = card.find("p", class_="people-search-card__location").text
companies = "n/a"
has_companies = card.find("span", class_="entity-list-meta__entities-list")
if has_companies:
companies = has_companies.text

search_data = SearchData(
name=name,
display_name=display_name,
url=href,
location=location,
companies=companies
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
crawl_profiles,
profile_list,
[location] * len(profile_list),
[data_pipeline] * len(profile_list),
[retries] * len(profile_list)
)



if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["bill gates", "elon musk"]

## Job Processes
filename = "profile-crawl.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

Step 6: Production Run

Alright! Time to run our crawler in production. We're going to run with 5 threads. If you want different results, feel free to change any of the following from our main.

You can run the script with the following command: python name_of_your_script.py.

  • MAX_RETRIES
  • MAX_THREADS
  • LOCATION
  • keyword_list

Crawler Performance Results Terminal

As you can see above, we crawled two names in 7.167 seconds. 7.167 / 2 = 3.584 seconds per search. It's not uncommon for to take 7 to 10 seconds on a single page, so these results are pretty good.


Build A LinkedIn Profile Scraper

Time to build our profile scraper. This piece of our program is going to read profile-crawl.csv and then scrape each individual profile found in the crawl. We're going to use iterative building to add features, just like we did earlier.


Step 1: Create Simple Profile Data Parser

Once again, we'll start with a simple parsing function. Like we did earlier, we add retries, error handling and set our basic structure as well.

def scrape_profile(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")
script = head.select_one("script[type='application/ld+json']")
json_data_graph = json.loads(script.text)["@graph"]
json_data = {}
person_pipeline = DataPipeline(f"{row['name']}.csv")
for element in json_data_graph:
if element["@type"] == "Person":
json_data = element
break

company = "n/a"
company_profile = "n/a"
job_title = "n/a"

if "jobTitle" in json_data.keys() and type(json_data["jobTitle"] == list) and len(json_data["jobTitle"]) > 0:
job_title = json_data["jobTitle"][0]

has_company = "worksFor" in json_data.keys() and len(json_data["worksFor"]) > 0
if has_company:
company = json_data["worksFor"][0]["name"]
has_company_url = "url" in json_data["worksFor"][0].keys()
if has_company_url:
company_profile = json_data["worksFor"][0]["url"]

has_interactions = "interactionStatistic" in json_data.keys()
followers = 0
if has_interactions:
stats = json_data["interactionStatistic"]
if stats["name"] == "Follows" and stats["@type"] == "InteractionCounter":
followers = stats["userInteractionCount"]

profile_data = {
"name": row["name"],
"company": company,
"company_profile": company_profile,
"job_title": job_title,
"followers": followers
}
print(profile_data)

success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

else:
logger.info(f"Successfully parsed: {row['url']}")
  • First, we find the head of the page. The head contains all sorts of metadata.
  • We use head.select_one("script[type='application/ld+json']") to find our JSON blob located inside the head.
  • We load the JSON and iterate through the "@graph" element until we find a field called "Person". We use this "Person" field to extract our data.
  • We attempt to extract the following and set defaults just in case something is not found:
    • company: the company that a person works for.
    • company_profile: the company's LinkedIn profile.
    • job_title: the person's official job title.
    • followers: the amount of other people following this person.

Step 2: Loading URLs To Scrape

We need to load our CSV file into an array so we can work with it. We'll write another function called process_results().

This one is pretty simple. It reads our CSV file into an array of dict objects. Then it runs scrape_profile() on each profile from the array.

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
scrape_profile(row, location, retries=retries)

You can see how everything fits together in our code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
display_name: str = ""
url: str = ""
location: str = ""
companies: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def crawl_profiles(name, location, data_pipeline=None, retries=3):
first_name = name.split()[0]
last_name = name.split()[1]
url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")



soup = BeautifulSoup(response.text, "html.parser")
profile_cards = soup.find_all("div", class_="base-search-card__info")
for card in profile_cards:
href = card.parent.get("href").split("?")[0]
name = href.split("/")[-1].split("?")[0]
display_name = card.find("h3", class_="base-search-card__title").text
location = card.find("p", class_="people-search-card__location").text
companies = "n/a"
has_companies = card.find("span", class_="entity-list-meta__entities-list")
if has_companies:
companies = has_companies.text

search_data = SearchData(
name=name,
display_name=display_name,
url=href,
location=location,
companies=companies
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
crawl_profiles,
profile_list,
[location] * len(profile_list),
[data_pipeline] * len(profile_list),
[retries] * len(profile_list)
)


def scrape_profile(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")
script = head.select_one("script[type='application/ld+json']")
json_data_graph = json.loads(script.text)["@graph"]
json_data = {}
person_pipeline = DataPipeline(f"{row['name']}.csv")
for element in json_data_graph:
if element["@type"] == "Person":
json_data = element
break

company = "n/a"
company_profile = "n/a"
job_title = "n/a"

if "jobTitle" in json_data.keys() and type(json_data["jobTitle"] == list) and len(json_data["jobTitle"]) > 0:
job_title = json_data["jobTitle"][0]

has_company = "worksFor" in json_data.keys() and len(json_data["worksFor"]) > 0
if has_company:
company = json_data["worksFor"][0]["name"]
has_company_url = "url" in json_data["worksFor"][0].keys()
if has_company_url:
company_profile = json_data["worksFor"][0]["url"]

has_interactions = "interactionStatistic" in json_data.keys()
followers = 0
if has_interactions:
stats = json_data["interactionStatistic"]
if stats["name"] == "Follows" and stats["@type"] == "InteractionCounter":
followers = stats["userInteractionCount"]

profile_data = {
"name": row["name"],
"company": company,
"company_profile": company_profile,
"job_title": job_title,
"followers": followers
}
print(profile_data)

success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
scrape_profile(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["bill gates", "elon musk"]

## Job Processes
filename = "profile-crawl.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

process_results(filename, LOCATION, retries=MAX_RETRIES)
  • scrape_profile() is used to scrape data from individual profile pages.
  • process_results() reads our CSV file and runs scrape_profile() on all of the profiles from our CSV.

Step 3: Storing the Scraped Data

We've already got most of the infrastructure to store this data. Earlier, we wrote a SearchData class and a DataPipeline. Both of these classes are technically reusable but SearchData won't work for us. We need another dataclass with different fields.

Take a look at our new dataclass. We'll call this one ProfileData.

@dataclass
class ProfileData:
name: str = ""
company: str = ""
company_profile: str = ""
job_title: str = ""
followers: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Now, in our updated script, we open a DataPipeline from within our parsing function and pass ProfileData objects into it.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
display_name: str = ""
url: str = ""
location: str = ""
companies: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ProfileData:
name: str = ""
company: str = ""
company_profile: str = ""
job_title: str = ""
followers: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def crawl_profiles(name, location, data_pipeline=None, retries=3):
first_name = name.split()[0]
last_name = name.split()[1]
url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")



soup = BeautifulSoup(response.text, "html.parser")
profile_cards = soup.find_all("div", class_="base-search-card__info")
for card in profile_cards:
href = card.parent.get("href").split("?")[0]
name = href.split("/")[-1].split("?")[0]
display_name = card.find("h3", class_="base-search-card__title").text
location = card.find("p", class_="people-search-card__location").text
companies = "n/a"
has_companies = card.find("span", class_="entity-list-meta__entities-list")
if has_companies:
companies = has_companies.text

search_data = SearchData(
name=name,
display_name=display_name,
url=href,
location=location,
companies=companies
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
crawl_profiles,
profile_list,
[location] * len(profile_list),
[data_pipeline] * len(profile_list),
[retries] * len(profile_list)
)


def scrape_profile(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code != 200:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")
script = head.select_one("script[type='application/ld+json']")
json_data_graph = json.loads(script.text)["@graph"]
json_data = {}
person_pipeline = DataPipeline(f"{row['name']}.csv")
for element in json_data_graph:
if element["@type"] == "Person":
json_data = element
break

company = "n/a"
company_profile = "n/a"
job_title = "n/a"

if "jobTitle" in json_data.keys() and type(json_data["jobTitle"] == list) and len(json_data["jobTitle"]) > 0:
job_title = json_data["jobTitle"][0]

has_company = "worksFor" in json_data.keys() and len(json_data["worksFor"]) > 0
if has_company:
company = json_data["worksFor"][0]["name"]
has_company_url = "url" in json_data["worksFor"][0].keys()
if has_company_url:
company_profile = json_data["worksFor"][0]["url"]

has_interactions = "interactionStatistic" in json_data.keys()
followers = 0
if has_interactions:
stats = json_data["interactionStatistic"]
if stats["name"] == "Follows" and stats["@type"] == "InteractionCounter":
followers = stats["userInteractionCount"]

profile_data = ProfileData (
name=row["name"],
company=company,
company_profile=company_profile,
job_title=job_title,
followers=followers
)
person_pipeline.add_data(profile_data)
person_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
scrape_profile(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["bill gates", "elon musk"]

## Job Processes
filename = "profile-crawl.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

process_results(filename, LOCATION, retries=MAX_RETRIES)
  • We use ProfileData to represent data scraped from individual profiles.
  • We pass our ProfileData objects directly into a DataPipeline just like we did with SearchData earlier in this project.

Step 4: Adding Concurrency

Now, we need to add concurrency. To accomplish this, we'll once again use ThreadPoolExecutor to run our parsing function. Our first argument is scrape_profile (the function we wish to call).

All other arguments to scrape_profile get passed in as arrays, just like before when we added multithreading.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_profile,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Step 5: Bypassing Anti-Bots

Bypassing anti-bots will be relatively easy. We already have our proxy function, get_scrapeops_url(). We just need to put it in the right place. We're going to change a single line from our parsing function.

response = requests.get(get_scrapeops_url(url, location=location))

We have unlocked the power of proxy.

Our full code is available below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
display_name: str = ""
url: str = ""
location: str = ""
companies: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ProfileData:
name: str = ""
company: str = ""
company_profile: str = ""
job_title: str = ""
followers: int = 0



def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def crawl_profiles(name, location, data_pipeline=None, retries=3):
first_name = name.split()[0]
last_name = name.split()[1]
url = f"https://www.linkedin.com/pub/dir?firstName={first_name}&lastName={last_name}&trk=people-guest_people-search-bar_search-submit"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")



soup = BeautifulSoup(response.text, "html.parser")
profile_cards = soup.find_all("div", class_="base-search-card__info")
for card in profile_cards:
href = card.parent.get("href").split("?")[0]
name = href.split("/")[-1].split("?")[0]
display_name = card.find("h3", class_="base-search-card__title").text
location = card.find("p", class_="people-search-card__location").text
companies = "n/a"
has_companies = card.find("span", class_="entity-list-meta__entities-list")
if has_companies:
companies = has_companies.text

search_data = SearchData(
name=name,
display_name=display_name,
url=href,
location=location,
companies=companies
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_crawl(profile_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
crawl_profiles,
profile_list,
[location] * len(profile_list),
[data_pipeline] * len(profile_list),
[retries] * len(profile_list)
)


def scrape_profile(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code != 200:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")

logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
head = soup.find("head")
script = head.select_one("script[type='application/ld+json']")
json_data_graph = json.loads(script.text)["@graph"]
json_data = {}
person_pipeline = DataPipeline(f"{row['name']}.csv")
for element in json_data_graph:
if element["@type"] == "Person":
json_data = element
break

company = "n/a"
company_profile = "n/a"
job_title = "n/a"

if "jobTitle" in json_data.keys() and type(json_data["jobTitle"] == list) and len(json_data["jobTitle"]) > 0:
job_title = json_data["jobTitle"][0]

has_company = "worksFor" in json_data.keys() and len(json_data["worksFor"]) > 0
if has_company:
company = json_data["worksFor"][0]["name"]
has_company_url = "url" in json_data["worksFor"][0].keys()
if has_company_url:
company_profile = json_data["worksFor"][0]["url"]

has_interactions = "interactionStatistic" in json_data.keys()
followers = 0
if has_interactions:
stats = json_data["interactionStatistic"]
if stats["name"] == "Follows" and stats["@type"] == "InteractionCounter":
followers = stats["userInteractionCount"]

profile_data = ProfileData (
name=row["name"],
company=company,
company_profile=company_profile,
job_title=job_title,
followers=followers
)
person_pipeline.add_data(profile_data)
person_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}, retries left: {retries-tries}")
tries += 1

if not success:
raise Exception(f"Max Retries exceeded: {retries}")

else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_profile,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["bill gates", "elon musk"]

## Job Processes
filename = "profile-crawl.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_crawl(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Time for our full production run. We're going to use the same setup we did earlier, 5 threads crawling 2 keywords. Feel free to change any of the following:

  • MAX_RETRIES
  • MAX_THREADS
  • LOCATION
  • keyword_list

Remember, our crawl took earlier took 7.167 seconds. While the crawl on this run was probably slightly different, we'll still estimate that it was probably close to the same. You can see a screenshot of our full results below.

Scraper Results Terminal

This time around, we generated an initial crawl report with 67 results. The full run took 471.032 seconds. 471.032 - 7.167 = 464.153 seconds. 464.153 seconds / 67 results = 6.927 seconds per result.

This is a bit slower than our initial crawl, but as we're bombarding LinkedIn with requests through the proxy, LinkedIn is regularly blocking proxy IP addresses and we're being re-routed through other proxy servers. With everything going on under the hood, these results are still great.


In many court cases, it has been decided that scraping the public web is perfectly legal. In this tutorial, we went through and scraped publicly available data from LinkedIn. When scraping private data (data behind a login), that's a completely different story and you're subject to a completely different set of rules and regulations.

Although our scraping job here was completely legal, we definitely violated LinkedIn's terms of service and robots.txt. You can view their terms here and you may view their robots.txt here.

It's important to note that LinkedIn has strict terms of service regarding data scraping, and scraping LinkedIn profiles without permission can lead to legal issues, including being banned from the platform.

Always ensure compliance with LinkedIn's policies and consider using official APIs or getting explicit permission for large-scale data extraction.

If you're unsure of your own scraper, consult an attorney.


Conclusion

We've now finished one of the most notoriously difficult scraping related tasks, LinkedIn Profiles. Our ScrapeOps Proxy easily bypassed anti-bots and got us through to get the data we need. You now should have a solid grasp of parsing, data storage, concurrency, and proxy integration.

You can dig deeper into the tech we used by clicking the links below.


More Python Web Scraping Guides

Here at ScrapeOps, we've always got something for you. Whether you're just learning how to code, or you're a seasoned dev, you can gain something from our tutorials.

Check out our Python Web Scraping Playbook. If you want to learn how to scrape another tricky site, check out the links below!