Skip to main content

Scrape Pinterest With Requests and BeautifulSoup

How to Scrape Pinterest Requests and BeautifulSoup

For years, Pinterest has been the go-to for all things creative on the internet. Whether you're looking for interesting recipes, decorative ideas, or anything else, Pinterest is a great place to go! Along with all of this, Pinterest is also a social network. This means that we can scrape valuable data such as account names, followers and more.

In this guide, we'll go over the following topics:


TLDR - How to Scrape Pinterest

If you're looking for a Pinterest scraper and you don't have time to read the article, we've got one for you right here and ready to go.

To use this code, create a config.json file with your "api_key" and place it in the same folder as this scraper. At that point, it's ready to go!!!

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
print(scrapeops_proxy_url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div")

result_count = 0

for div_card in div_cards:
if div_card.get("data-grid-item"):

result_count += 1

title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]

search_data = SearchData(
name=title,
url=url,
image=img_url
)

data_pipeline.add_data(search_data)

success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[data-test-id='CloseupDetails']")

website = "n/a"
has_website = main_card.select_one("span[style='text-decoration: underline;']")
if has_website:
website = f"https://{has_website.text}"

star_divs = main_card.select("div[data-test-id='rating-star-full']")
stars = len(star_divs)

profile_info = main_card.select_one("div[data-test-id='follower-count']")

account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_all("div")
account_name = nested_divs[0].get("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")

img_container = soup.select_one("div[data-test-id='pin-closeup-image']")
img = img_container.find("img").get("src")

pin_data = {
"name": account_name,
"website": website,
"stars": stars,
"follower_count": follower_count,
"image": img
}

print(pin_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_pin(row, location, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Feel free to tweak it as you need. You can change any of the following:

  • MAX_RETRIES: This parameter sets the maximum number of attempts the script will make to fetch data from a URL if the initial request fails.
  • MAX_THREADS: This parameter sets the maximum number of threads to use for processing results concurrently. This can speed up the processing of multiple pins or search results.
  • LOCATION: This parameter sets the geographical location from which the requests are made. It can affect the content returned by the website due to region-specific restrictions or differences.
  • keywords_list: This list contains the keywords for which you want to scrape Pinterest search results.

How To How To Architect Our Pinterest Scraper

Our scraper is going to utilize parsing, data storage, concurrency, and proxy integration. When we use a headless browser such as Selenium or Puppeteer, have have the ability to interact with the page and render JavaScript.

With Requests/BeautifulSoup, we don't get this luxury, so we'll be using the ScrapeOps Headless Browser to compensate for that.

In this tutorial, we'll be building both a scraper and a crawler.

Our project will utilize:

  1. Parsing: to extract the important data from Pinterest.
  2. Data Storage: To store our data for later review and also to feed information into our scraper.
  3. Concurrency: to process multiple pages simultaneously and efficiently.
  4. Proxy Integration: Pinterest is notoriously difficult to access programmatically, so we'll be using the ScrapeOps Proxy API.

Understanding How To Scrape Pinterest

Step 1: How To Request Pinterest Pages

When we perform a search on Pinterest, we're making a GET request to the server. A get request includes our base url and some additional parameters. Feel free to take a look at the screenshot below, it's a search for the keyword "grilling".

If you look at the address bar, our URL is:

https://www.pinterest.com/search/pins/?q=grilling&rs=typed
  • Our base domain is https://www.pinterest.com/search/pins/ and our query parameters are q=grilling&rs=typed.
  • rs=typed is a standard param that gets added to the url when you perform a search on Pinterest.
  • q=grilling contains the actual keywords we're searching for (in this case, "grilling").

Pinterest Search Results

Individual pages on Pinterest are just a simple number. Here is a pin page from the search we performed above. As you can see, the URL is pretty simple:

https://www.pinterest.com/pin/45176802505307132/

https://www.pinterest.com/pin/ tells the server that we want a pin. 45176802505307132 represents the number of the pin.

Pin Page


Step 2: How To Extract Data From Pinterest Results and Pages

There are a couple important things to extract the data from a Pinterest page.

  1. First, our content is all loaded via JavaScript, so we won't be able to pull the page content until it's been rendered. To do this, we'll be passing the wait argument into the ScrapeOps API.

The ScrapeOps API actually runs a headless browser inside of it. When we use the wait param, this tells to the ScrapeOps server to wait a certain amount of time for the content to render and then send the page results back to us.

  1. Once we've got our content, it's nested pretty badly inside the page. Even worse, Pinterest uses dynamic CSS classes and does not use traditional CSS for the page layout.

If you look below, you'll see exactly what I'm talking about.

Results Page Inspection

Now, let's take a look at the pin page. Most of our important pieces of data contain the trait data-test-id. When scraping the pin page, we'll be using data-test-id to find most of our relevant information.

Pin Page Inspection


Step 3: Geolocated Data

To scrape Pinterest, we'll be using the country param to the ScrapeOps API as well. This parameter allows us to be routed through a server in whichever country we choose.

  • For instance, if we want to appear in the US, we'd set our country to "us".
  • If we want to appear in the UK, we can set our country to "uk".

During testing, this parameter was incredibly important. Occasionally you can even get blocked when using a proxy and this happened during testing. The simple fix was to change our country from the US to the UK.

If you are following along and have issues with your scrape even though it worked successfully earlier, first try changing your location with the ScrapeOps API, this did the trick for us.


Setting Up Our pinterest Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir pinterest-scraper

cd pinterest-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A Pinterest Search Crawler

The first scraper we build is going to be our crawler. Let's get started! Our crawler is going to do the following:

  1. Parsing: to pull the important data from the page.
  2. Data Storage: to safely store our data for later use.
  3. Proxy: to get past anti-bots and any other potential roadblocks we may encounter.

If our crawler utilizes these things, we can:

  • Fetch a page
  • Extract the results
  • Save the results
  • Bypass any potential anti-bots or other blockers

Step 1: Create Simple Search Data Parser

Let's get started by building a parser. The goal of our parser is to fetch a page, and then extract information from that website.

The code structure below is relatively simple. After our imports, we read our API key with the script below:

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

  • First, we declare our API key as a variable, then after reading config.json, we assign they key from the file to our API_KEY variable.
  • Then, we create a function, scrape_search_results(), which does the parsing.
  • As long as we have retries left and the operation has not succeeded, we try to get the page and then pull the information from it.
    • If the operation fails, we retry until it either succeeds or runs out of retries.
    • If we completely run out of retries, we allow the scraper to crash and print an error message.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div")

result_count = 0

for div_card in div_cards:
if div_card.get("data-grid-item"):

result_count += 1

title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]

search_data = {
"name": title,
"url": url,
"image": img_url
}

print(search_data)

success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")





if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

The code above does the following:

  • After finding all the divs, we check each one with div_card.get("data-grid-item"). Each result in our search is a data-grid-item.
  • We then find each link element with div_card.find("a") and we extract it with url = f"https://pinterest.com{a_element['href']}".
  • To find our image, we use img = div_card.find("img") and we then pull the link to the image with img_url = img["src"].

Step 2: Storing the Scraped Data

Now that we're getting the proper information, we need to be able to store our data. We'll be using two separate classes for our data, SearchData, and Datapipeline.

  1. SearchData is a class built specifically to hold our data.
  2. DataPipeline is a pipeline to a CSV file. This class filters out duplicates from hitting our CSV and then stores the CSV safely.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div")

result_count = 0

for div_card in div_cards:
if div_card.get("data-grid-item"):

result_count += 1

title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]

search_data = SearchData(
name=title,
url=url,
image=img_url
)

data_pipeline.add_data(search_data)

success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")





if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
  • After we've pulled our data from the page, we turn it into a SearchData object.
  • Next, we add search_data to the pipeline with data_pipeline.add_data(search_data).
  • Once our operation has finished, we close the pipeline.

Step 3: Bypassing Anti-Bots

At this point, the crawler is more or less finished, but first, we need to add anti-bot support.

Typically, we would not have the wait parameter in the code below, but on Pinterest, all of our content is dynamically generated, so "wait": 2000 tells the ScrapeOps server to wait 2 seconds for our content to render and then it sends us the page.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

Take a look at our overall script now:

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div")

result_count = 0

for div_card in div_cards:
if div_card.get("data-grid-item"):

result_count += 1

title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]

search_data = SearchData(
name=title,
url=url,
image=img_url
)

data_pipeline.add_data(search_data)

success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")





if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

In this code, we parse the search results. Afterward, we store them in a CSV file. Finally we do all of this with a proxy. Our proxy is incredibly important, it does the following:

  • Penetrates any systems that may block us.
  • wait 2 seconds for the page to render.
  • Sends us the page after it has loaded.

Step 4: Production Run

Now that we've got a working crawler, it's time to run it in production. Take a look at our main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

We do a search for "grilling". Feel free to change any of the constants yourself and tweak the code, just remember, we don't have actual concurrency yet, this will be added in when we're scraping the individual posts that we find with the crawler.

Here are the results from our crawler:

Crawler Results

We crawled "grilling" in 7.331 seconds. Results may vary based on the location of your server and the quality of your internet connection.


Build A Pinterest Scraper

Next, it's time to build our Pinterest scraper. The scraper needs to be able to do the following:

  1. Parse the information from a pin.
  2. Read the rows from the CSV file.
  3. Store the data we extracted when parsing.
  4. Perform all these actions concurrently.
  5. Integrate with the ScrapeOps Proxy API

Step 1: Create Simple Data Parser

Let's get started building our pin parser. This parser needs to lookup a pin, and then pull information from that pin. The code below contains our process_pin() function.

Similar to our crawler, we use the retries and success model. While we still have retries left and the operation hasn't succeeded, we find the main card and pull relevant information from it.

def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[data-test-id='CloseupDetails']")

website = "n/a"
has_website = main_card.select_one("span[style='text-decoration: underline;']")
if has_website:
website = f"https://{has_website.text}"

star_divs = main_card.select("div[data-test-id='rating-star-full']")
stars = len(star_divs)

profile_info = main_card.select_one("div[data-test-id='follower-count']")

account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_all("div")
account_name = nested_divs[0].get("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")

img_container = soup.select_one("div[data-test-id='pin-closeup-image']")
img = img_container.find("img").get("src")

pin_data = {
"name": account_name,
"website": website,
"stars": stars,
"follower_count": follower_count,
"image": img
}

print(pin_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")

When we're attempting to parse the pin, we do the following:

  • Find the main_card using its CSS selector: main_card = soup.select_one("div[data-test-id='CloseupDetails']").
  • main_card.select("div[data-test-id='rating-star-full']") finds all of the star elements on the page. We then count the stars with stars = len(star_divs).
  • Find the div that holds the account information with account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']").
  • nested_divs[0].get("title") finds our account name.
  • We remove our account_name and other irrelevant text with profile_info.text.replace(account_name, "").replace(" followers", "")

Step 2: Loading URLs To Scrape

Now, we need to load our urls. We can't look our pins up and parse them if we can't load the urls from the CSV file. It's time to update our overall code to add the parsing function above and to read the CSV file.

Let's start with our process_results() function:

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_pin(row, location, retries=retries)

Now, take a look at the overall code to see how it all fits together.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
print(scrapeops_proxy_url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div")

result_count = 0

for div_card in div_cards:
if div_card.get("data-grid-item"):

result_count += 1

title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]

search_data = SearchData(
name=title,
url=url,
image=img_url
)

data_pipeline.add_data(search_data)

success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[data-test-id='CloseupDetails']")

website = "n/a"
has_website = main_card.select_one("span[style='text-decoration: underline;']")
if has_website:
website = f"https://{has_website.text}"

star_divs = main_card.select("div[data-test-id='rating-star-full']")
stars = len(star_divs)

profile_info = main_card.select_one("div[data-test-id='follower-count']")

account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_all("div")
account_name = nested_divs[0].get("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")

img_container = soup.select_one("div[data-test-id='pin-closeup-image']")
img = img_container.find("img").get("src")

pin_data = {
"name": account_name,
"website": website,
"stars": stars,
"follower_count": follower_count,
"image": img
}

print(pin_data)
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_pin(row, location, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 3: Storing the Scraped Data

As before, we need to store our scraped data. We need to create another dataclass, PinData. Just like SearchData, the job of PinData is to simply hold data. We then go ahead and pass this into a DataPipeline.

Take a look, it's almost identical to SearchData.

@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Now, lets update our script.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
print(scrapeops_proxy_url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div")

result_count = 0

for div_card in div_cards:
if div_card.get("data-grid-item"):

result_count += 1

title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]

search_data = SearchData(
name=title,
url=url,
image=img_url
)

data_pipeline.add_data(search_data)

success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[data-test-id='CloseupDetails']")
pin_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")

website = "n/a"
has_website = main_card.select_one("span[style='text-decoration: underline;']")
if has_website:
website = f"https://{has_website.text}"

star_divs = main_card.select("div[data-test-id='rating-star-full']")
stars = len(star_divs)

profile_info = main_card.select_one("div[data-test-id='follower-count']")

account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_all("div")
account_name = nested_divs[0].get("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")

img_container = soup.select_one("div[data-test-id='pin-closeup-image']")
img = img_container.find("img").get("src")

pin_data = PinData(
name=account_name,
website=website,
stars=stars,
follower_count=follower_count,
image=img
)

pin_pipeline.add_data(pin_data)
pin_pipeline.close_pipeline()

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_pin(row, location, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Aside from our new class, here are the key differences you should notice:

  • We open a new DataPipeline for our PinData, pin_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") .
  • Intead of printing the dictionary like we did earlier, we construct a PinData object out of it.
  • We pass the PinData into our pipeline and then close the pipeline.

Step 4: Adding Concurrency

We've hit the point that we need to start thinking about performace. To achieve better performance, we need to add concurrency.

To do this, we're going to use ThreadPoolExecutor to add multithreading support to our scraper. Our MAX_THREADS constant will finally get used now.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_pin,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Let's outline the different arguments to executor.map():

  • process_pin is the function that we're calling to to run on multiple threads
  • reader is an array of dict objects that we read from the CSV file.
  • We then pass the location in as an array the length of the reader
  • We pass the retries in as an array as well

Step 5: Bypassing Anti-Bots

There is one final change we need to make to our scraper. Inside of process_pin(), we change the following line.

response = requests.get(get_scrapeops_url(url, location=location))

Here is our fully updated scraper:

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
print(scrapeops_proxy_url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

div_cards = soup.find_all("div")

result_count = 0

for div_card in div_cards:
if div_card.get("data-grid-item"):

result_count += 1

title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]

search_data = SearchData(
name=title,
url=url,
image=img_url
)

data_pipeline.add_data(search_data)

success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[data-test-id='CloseupDetails']")
pin_pipeline = DataPipeline(csv_filename=f"{row['name'][0:20].replace(' ', '-')}.csv")

website = "n/a"
has_website = main_card.select_one("span[style='text-decoration: underline;']")
if has_website:
website = f"https://{has_website.text}"

star_divs = main_card.select("div[data-test-id='rating-star-full']")
stars = len(star_divs)

profile_info = main_card.select_one("div[data-test-id='follower-count']")

account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_all("div")
account_name = nested_divs[0].get("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")

img_container = soup.select_one("div[data-test-id='pin-closeup-image']")
img = img_container.find("img").get("src")

pin_data = PinData(
name=account_name,
website=website,
stars=stars,
follower_count=follower_count,
image=img
)

pin_pipeline.add_data(pin_data)
pin_pipeline.close_pipeline()

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_pin,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Now that we've got our production scraper, it's time for our production run. Once again, take a look at the main and feel free to change any constant you want.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Here are the results.

Scraper Results

The crawl and the scraping process finished in 51.88 seconds. All in all, this comes out to just over 2 seconds per page. Considering that ScrapeOps is waiting 2 seconds before sending each page back to us, 2 seconds per page is incredible!!!


Whenever you scrape a website, you need to be aware of the Terms of Service and robots.txt.. You can view Pinterest's terms here.

If you access private data on their site in a way that violates these terms, you can even lose your Pinterest account! You can view their robots.txt here.

Also, keep in mind whether you are scraping public data. Private data (data behind a login), can often be illegal to scrape. Generally, public data (data not behind a login) is public information and therefore fair game when scraping.

If you are unsure of the legality of a your scraper, it is best to consult an attorney based in your jurisdiction.


Conclusion

You made it! Congratulations on finishing this tutorial. You now know how to build both a crawler and a scraper. You also have a solid grasp of parsing, data storage, concurrency, and proxy integration.

You should also have a solid grasp of how to use requests and beautifulsoup.


More Web Scraping Guides

Now that you know how to scrape Pinterest, you have a whole new skillset for your scraping toolbox. Take this knowledge and go build something!!!

If you're in the mood to learn more, check out our Python Web Scraping Playbook or one of these cool articles below!!!