Skip to main content

Scrape Pinterest With Selenium

How to Scrape Pinterest With Selenium

Pinterest is the perfect place on the web for all sorts of creative ideas. Whether you're looking for recipes, cool office ideas, or pretty much anything else, somebody has likely posted something to inspire you on Pinterest. Not only does Pinterest have these things, but it's also a social network. This allows us not only to extract data from individual posts (pins), but other important things such as the account that made the pin, and their follower count.

In this guide and you will learn how to scrape Pinterest.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape Pinterest

Looking to scrape, but you don't have the time for a tutorial, use this one.

Just make a config.json file with your API key and place it in the same folder as this file and you're ready to go!

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep

OPTIONS = webdriver.ChromeOptions()

prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)

user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False

while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")

## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")

print("found div cards:", len(div_cards))


for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")

search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)



logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:

driver = webdriver.Chrome(options=OPTIONS)
driver.get(get_scrapeops_url(url, location=location))

try:
main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']")
pin_pipeline = DataPipeline(csv_filename=f"{row['name'][0:20].replace(' ', '-')}.csv")
website = "n/a"

website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']")
has_website = len(website_holder) > 0
if has_website:
website = f"https://{website_holder[0].text}"

star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']")
stars = len(star_divs)

profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']")

account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div")
account_name = nested_divs[0].get_attribute("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")

img = "n/a"
img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']")
if len(img_container) > 0:
img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src")

pin_data = PinData(
name=account_name,
website=website,
stars=stars,
follower_count=follower_count,
image=img
)

pin_pipeline.add_data(pin_data)
pin_pipeline.close_pipeline()


success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1

finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_pin,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

To change your results, feel free to tweak any of the following:

  • MAX_RETRIES: This parameter sets the maximum number of attempts the script will make to fetch data from a URL if the initial request fails.
  • MAX_THREADS: This parameter sets the maximum number of threads to use for processing results concurrently. This can speed up the processing of multiple pins or search results.
  • LOCATION: This parameter sets the geographical location from which the requests are made. It can affect the content returned by the website due to region-specific restrictions or differences.
  • keywords_list: This list contains the keywords for which you want to scrape Pinterest search results.

How To How To Architect Our Pinterest Scraper

For this scraping project, we'll actually be building two scrapers.

  1. Our first scraper will be the crawler. The crawler needs to perform a search, pull information from the search and save it to a CSV file.
  2. Once our crawler is spitting out CSV files, we'll make a pin scraper that reads the CSV file and scrapes every pin individually from the CSV file.

We'll utilize the following in this project:

  1. Parsing: to extract the important data from Pinterest.
  2. Data Storage: To store our data for later review and also to feed information into our scraper.
  3. Concurrency: to process multiple pages simultaneously and efficiently.
  4. Proxy Integration: Pinterest is notoriously difficult to access programmatically, so we'll be using the ScrapeOps Proxy API.

Understanding How To Scrape pinterest

Step 1: How To Request Pinterest Pages

Any scrape starts with a simple GET request to the site's server and Pinterest is no exception. When you type a domain name into your address bar, you're performing a GET request.

If we perform a search for "grilling", our URL looks like this

https://www.pinterest.com/search/pins/?q=grilling&rs=typed
  • Our actual domain is https://www.pinterest.com/search/pins/.
  • The ? character denotes our queries which get separated by the & if we have multiple queries.
  • In this example, our queries are grilling, and typed. The full query string is ?q=grilling&rs=typed.

Take a look at the image below and examine it for yourself.

Pinterest Search Results

Individual pins all receive their own unique number on Pinterest. Here is the URL for the pin below:

https://www.pinterest.com/pin/45176802505307132/

The unique number for this pin is 45176802505307132. For any pin on Pinterest, the URL gets laid out like this:

https://www.pinterest.com/pin/PIN-NUMBER-GOES-HERE/

Pin Page


Step 2: How To Extract Data From Pinterest Results and Pages

Data from Pinterest can be quite a pain to extract. Not only is it deeply nested within the page, it is all generated dynamically. Along with the dynamic content, Pinterest uses numerous frontend JavaScript tactics to block you even when you're routed through a proxy.

Take a look at the results page below and you can see how nasty the HTML is.

Results Page Inspection

And here is our pin page.

Pin Page Inspection

In order to get around this, we'll be using the ScrapeOps API along with its builtin headless browser. Even though we're using Selenium, we're actually going to have to disable JavaScript so we can stop from getting blocked!


Step 3: Geolocated Data

To handle geolocation, we'll be passing a country parameter into the ScrapeOps API.

  • With the ScrapeOps API, if you pass "us" in as your country, you'll be routed through a server in the US.
  • If you want to be routed through the UK, you can pass "uk".

In Pinterest's case, geolocation is pretty important. They sometimes even block proxies. If you are getting blocked while using a proxy, try changing your location.

This worked for us every time in testing.


Setting Up Our Pinterest Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir pinterest-scraper

cd pinterest-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install selenium

Make sure you have webdriver installed! If you don't, you can check here


Build A Pinterest Search Crawler

As we mentioned earlier in this article, the first scraper we need to build is our crawler. The crawler is going to use the following:

  1. Parsing: to extract valuable data from the page.
  2. Data Storage: to store our data in a safe and efficient manner.
  3. Proxy Integration: to get past anti-bots and anything else that might block us.

Step 1: Create Simple Search Data Parser

We'll start by creating setting up a basic scraper with some error handling and retry logic. Along with that, we'll read our API key from a config file, config.json. Simply create this file and add your API key to it.

The entire config file should look like this:

{
"api_key": "YOUR-SUPER-SECRET-API-KEY"
}

Here is our full code so far:

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep

OPTIONS = webdriver.ChromeOptions()

prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)

user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False

while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
driver.get(url)
logger.info(f"Fetched {url}")

## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")

print("found div cards:", len(div_cards))


for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")

search_data = {
"name": title,
"url": url,
"image": img_url
}

print(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)

logger.info(f"Crawl complete.")

In the example above, pay attention to our parsing logic:

  • is_card = div_card.get_attribute("data-grid-item") determines whether or not each div is a search result. All search results contain the attribute, data-grif-item.
  • We pull the title with a_element.get_attribute("aria-label")
  • We find the url of each pin with a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") We then replace the ScrapeOps URL with Pinterest's URL.
  • Once we've parsed through all these results, we set success to True and exit the function.

Step 2: Storing the Scraped Data

Pulling the right data doesn't do us much good if we can't store it. To store our data, we'll be adding both a SearchData class, and a DataPipeline class.

  1. SearchData simply takes our data and turns it into a uniform object that holds it.
  2. Once we have our SearchData, we can then pass it into the DataPipeline which filters out our duplicates and saves all of our relevant information to a CSV file.

Take a look at our updated code now:

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep

OPTIONS = webdriver.ChromeOptions()

prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)

user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False

while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
driver.get(url)
logger.info(f"Fetched {url}")

## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")

print("found div cards:", len(div_cards))


for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")

search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)



logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Quick recap of our changes:

  • Instead of turning our search_data into a dict, we use it to build a SearchData object.
  • Once we have our search_data, we pass it into the data_pipeline.
  • After this operation is complete and we've exited scrape_search_results(), we close the pipeline.

Step 3: Bypassing Anti-Bots

As you may have noticed in our earlier examples, we're not running in headless mode and we have JavaScript turned off.

The reason for this: we need ScrapeOps to render the page (rendering in headless mode sometimes causes issues with Selenium), and we can't execute the JavaScript that Pinterest uses for authentication/fetching more content... it will destroy the page.

The options below show our ChromeOptions. In the prefs, you should see "profile.managed_default_content_settings.javascript": 2. This turns off JavaScript support.

OPTIONS = webdriver.ChromeOptions()

prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)

user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")

Here is the function we'll be using for proxy support:

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

Here is our production ready crawler:

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep

OPTIONS = webdriver.ChromeOptions()

prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)

user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False

while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")

## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")

print("found div cards:", len(div_cards))


for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")

search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)



logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

Step 4: Production Run

Now it's time to run it in production and test it out. Here is our main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

If you're following along, feel free to tweak these constants anyway you'd like and experiment with the results.

Here are the results from our production run.

Crawler Results

It took 16.68 seconds to parse the page and save the results. Your results will probably vary based on your hardware, internet connection, and the location of your server.


Build A Pinterest Scraper

When we build our pin scraper, we need to incorporate the following things into our design:

  1. Parse the information from a pin.
  2. Read the rows from the CSV file.
  3. Store the data we extracted when parsing.
  4. Perform all these actions concurrently.
  5. Integrate with the ScrapeOps Proxy API

Step 1: Create Simple Data Parser

Just like before, we'll get started with a simple parsing function. Take a look a the snippet below, while the parsing logic looks different, the overall structure is virtually identical.

def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:

driver = webdriver.Chrome(options=OPTIONS)
driver.get(url)

try:
main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']")
website = "n/a"

website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']")
has_website = len(website_holder) > 0
if has_website:
website = f"https://{website_holder[0].text}"

star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']")
stars = len(star_divs)

profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']")

account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div")
account_name = nested_divs[0].get_attribute("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")

img = "n/a"
img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']")
if len(img_container) > 0:
img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src")

pin_data = {
"name": account_name,
"website": website,
"stars": stars,
"follower_count": follower_count,
"image": img
}

print(pin_data)

success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1

finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")

Key points from above:

  • We use find_elements() to look for website_holder.
  • If our website_holder is longer than zero, there is a website present, so we reassign the website from "n/a" to the actual website.
  • We find all the star_divs. Each star is a unique element on the page, so we can obtain the rating by counting these elements.
  • We then find the follower_count, account_name and the image of the pin.

Step 2: Loading URLs To Scrape

Our parsing function doesn't do much good if our scraping doesn't know what to parse.

  • This function uses csv.Dictreader() to read the file into an array.
  • We then pass each row from the array into process_pin().
  • Later on, we'll add concurrency to this function, but for now, we'll use a for loop as a placeholder.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_pin(row, location, retries=retries)

Here is our fully updated code:

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep

OPTIONS = webdriver.ChromeOptions()

prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)

user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False

while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")

## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")

print("found div cards:", len(div_cards))


for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")

search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)



logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:

driver = webdriver.Chrome(options=OPTIONS)
driver.get(url)

try:
main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']")
website = "n/a"

website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']")
has_website = len(website_holder) > 0
if has_website:
website = f"https://{website_holder[0].text}"

star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']")
stars = len(star_divs)

profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']")

account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div")
account_name = nested_divs[0].get_attribute("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")

img = "n/a"
img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']")
if len(img_container) > 0:
img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src")

pin_data = {
"name": account_name,
"website": website,
"stars": stars,
"follower_count": follower_count,
"image": img
}

print(pin_data)

success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1

finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_pin(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 3: Storing the Scraped Data

Time to store our data... sound familiar? In this section we'll create another dataclass, PinData. Take a look below, our PinData is actually very similar to SearchData.

This object will even be passed into the DataPipeline the same way.

@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Now, let's update our script:

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep

OPTIONS = webdriver.ChromeOptions()

prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)

user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False

while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")

## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")

print("found div cards:", len(div_cards))


for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")

search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)



logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:

driver = webdriver.Chrome(options=OPTIONS)
driver.get(url)

try:
main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']")
pin_pipeline = DataPipeline(csv_filename=f"{row['name'][0:20].replace(' ', '-')}.csv")
website = "n/a"

website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']")
has_website = len(website_holder) > 0
if has_website:
website = f"https://{website_holder[0].text}"

star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']")
stars = len(star_divs)

profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']")

account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div")
account_name = nested_divs[0].get_attribute("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")

img = "n/a"
img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']")
if len(img_container) > 0:
img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src")

pin_data = PinData(
name=account_name,
website=website,
stars=stars,
follower_count=follower_count,
image=img
)

pin_pipeline.add_data(pin_data)
pin_pipeline.close_pipeline()


success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1

finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_pin(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

The main differences in this version of the script are:

  • Inside of process_pin() we instantiate a DataPipeline object.
  • Once we've parsed our data, we use it to create a PinData object.
  • We pass the pin_data variable into the the data_pipeline.
  • After we've finished using the pipeline, we close it and exit the function.

Step 4: Adding Concurrency

Efficiency and speed are key when doing any large task at scale. To get more make our scraper more efficient and faster, we're going to add concurrency with ThreadPoolExecutor.

Here we make a simple, but big change to process_results().

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_pin,
reader,
[location] * len(reader),
[retries] * len(reader)
)

Time to explain the arguments to executor.map():

  • process_pin is the function we wish to run on multiple threads
  • reader is the array of objects we want to pass into the function
  • We also pass in our location and our retries as arrays.

Step 5: Bypassing Anti-Bots

We're almost ready for production, but before we can run our scraper, we need to once again add in proxy support. Since we've already got our get_scrapeops_url() function, we just need to add it into one line.

driver.get(get_scrapeops_url(url, location=location))

Here is our production ready code containing both the crawler and the scraper:

import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep

OPTIONS = webdriver.ChromeOptions()

prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)

user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False

while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")

## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")

print("found div cards:", len(div_cards))


for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")

search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)



logger.info(f"Successfully parsed data from: {url}")
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")

finally:
driver.quit()

if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:

driver = webdriver.Chrome(options=OPTIONS)
driver.get(get_scrapeops_url(url, location=location))

try:
main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']")
pin_pipeline = DataPipeline(csv_filename=f"{row['name'][0:20].replace(' ', '-')}.csv")
website = "n/a"

website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']")
has_website = len(website_holder) > 0
if has_website:
website = f"https://{website_holder[0].text}"

star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']")
stars = len(star_divs)

profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']")

account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div")
account_name = nested_divs[0].get_attribute("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")

img = "n/a"
img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']")
if len(img_container) > 0:
img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src")

pin_data = PinData(
name=account_name,
website=website,
stars=stars,
follower_count=follower_count,
image=img
)

pin_pipeline.add_data(pin_data)
pin_pipeline.close_pipeline()


success = True

except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1

finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")




def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_pin,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Step 6: Production Run

Time to run the whole thing in production. Feel free to take a look at the main again and tweak whatever constants you'd like.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)

Here are the results:

Scraper Results

The operation finished in roughly 103 seconds. This comes out to an average of about 4.68 seconds per page. Considering that we're waiting 2 seconds for the server to return the page, this is pretty decent.


Whenever you scrape a website, you need to be aware of the Terms of Service and robots.txt.. You can view Pinterest's terms here.

If you access private data on their site in a way that violates these terms, you can even lose your Pinterest account! You can view their robots.txt here.

Also, keep in mind whether you are scraping public data. Private data (data behind a login), can often be illegal to scrape. Generally, public data (data not behind a login) is public information and therefore fair game when scraping.

If you are unsure of the legality of a your scraper, it is best to consult an attorney based in your jurisdiction.


Conclusion

Congratulations! You've finished this tutorial. Take your new knowledge of Selenium, CSS selectors, parsing, proxy integration, data storage and build something!

If you'd like to know more about the tech stack used in this article, take a look at the links below.


More Web Scraping Guides

Here at ScrapeOps, we've got tons of useful content.

Check out our Selenium Web Scraping Playbook or read one of the articles below and level up your scraping skills!!!