How to Scrape Pinterest With Selenium
Pinterest is the perfect place on the web for all sorts of creative ideas. Whether you're looking for recipes, cool office ideas, or pretty much anything else, somebody has likely posted something to inspire you on Pinterest. Not only does Pinterest have these things, but it's also a social network. This allows us not only to extract data from individual posts (pins), but other important things such as the account that made the pin, and their follower count.
In this guide and you will learn how to scrape Pinterest.
- TLDR: How to Scrape Pinterest
- How To Architect Our Scraper
- Understanding How To Scrape Pinterest
- Setting Up Our Pinterest Scraper
- Build A Pinterest Search Crawler
- Build A Pinterest Scraper
- Legal and Ethical Considerations
- Conclusion
- More Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Pinterest
Looking to scrape, but you don't have the time for a tutorial, use this one.
Just make a config.json
file with your API key and place it in the same folder as this file and you're ready to go!
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep
OPTIONS = webdriver.ChromeOptions()
prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False
while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")
print("found div cards:", len(div_cards))
for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")
search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(get_scrapeops_url(url, location=location))
try:
main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']")
pin_pipeline = DataPipeline(csv_filename=f"{row['name'][0:20].replace(' ', '-')}.csv")
website = "n/a"
website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']")
has_website = len(website_holder) > 0
if has_website:
website = f"https://{website_holder[0].text}"
star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']")
stars = len(star_divs)
profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']")
account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div")
account_name = nested_divs[0].get_attribute("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")
img = "n/a"
img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']")
if len(img_container) > 0:
img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src")
pin_data = PinData(
name=account_name,
website=website,
stars=stars,
follower_count=follower_count,
image=img
)
pin_pipeline.add_data(pin_data)
pin_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_pin,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
To change your results, feel free to tweak any of the following:
MAX_RETRIES
: This parameter sets the maximum number of attempts the script will make to fetch data from a URL if the initial request fails.MAX_THREADS
: This parameter sets the maximum number of threads to use for processing results concurrently. This can speed up the processing of multiple pins or search results.LOCATION
: This parameter sets the geographical location from which the requests are made. It can affect the content returned by the website due to region-specific restrictions or differences.keywords_list
: This list contains the keywords for which you want to scrape Pinterest search results.
How To How To Architect Our Pinterest Scraper
For this scraping project, we'll actually be building two scrapers.
- Our first scraper will be the crawler. The crawler needs to perform a search, pull information from the search and save it to a CSV file.
- Once our crawler is spitting out CSV files, we'll make a pin scraper that reads the CSV file and scrapes every pin individually from the CSV file.
We'll utilize the following in this project:
- Parsing: to extract the important data from Pinterest.
- Data Storage: To store our data for later review and also to feed information into our scraper.
- Concurrency: to process multiple pages simultaneously and efficiently.
- Proxy Integration: Pinterest is notoriously difficult to access programmatically, so we'll be using the ScrapeOps Proxy API.
Understanding How To Scrape pinterest
Step 1: How To Request Pinterest Pages
Any scrape starts with a simple GET request to the site's server and Pinterest is no exception. When you type a domain name into your address bar, you're performing a GET request.
If we perform a search for "grilling"
, our URL looks like this
https://www.pinterest.com/search/pins/?q=grilling&rs=typed
- Our actual domain is
https://www.pinterest.com/search/pins/
. - The
?
character denotes our queries which get separated by the&
if we have multiple queries. - In this example, our queries are
grilling
, andtyped
. The full query string is?q=grilling&rs=typed
.
Take a look at the image below and examine it for yourself.
Individual pins all receive their own unique number on Pinterest. Here is the URL for the pin below:
https://www.pinterest.com/pin/45176802505307132/
The unique number for this pin is 45176802505307132
. For any pin on Pinterest, the URL gets laid out like this:
https://www.pinterest.com/pin/PIN-NUMBER-GOES-HERE/
Step 2: How To Extract Data From Pinterest Results and Pages
Data from Pinterest can be quite a pain to extract. Not only is it deeply nested within the page, it is all generated dynamically. Along with the dynamic content, Pinterest uses numerous frontend JavaScript tactics to block you even when you're routed through a proxy.
Take a look at the results page below and you can see how nasty the HTML is.
And here is our pin page.
In order to get around this, we'll be using the ScrapeOps API along with its builtin headless browser. Even though we're using Selenium, we're actually going to have to disable JavaScript so we can stop from getting blocked!
Step 3: Geolocated Data
To handle geolocation, we'll be passing a country
parameter into the ScrapeOps API.
- With the ScrapeOps API, if you pass
"us"
in as your country, you'll be routed through a server in the US. - If you want to be routed through the UK, you can pass
"uk"
.
In Pinterest's case, geolocation is pretty important. They sometimes even block proxies. If you are getting blocked while using a proxy, try changing your location.
This worked for us every time in testing.
Setting Up Our Pinterest Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir pinterest-scraper
cd pinterest-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install selenium
Make sure you have webdriver installed! If you don't, you can check here
Build A Pinterest Search Crawler
As we mentioned earlier in this article, the first scraper we need to build is our crawler. The crawler is going to use the following:
- Parsing: to extract valuable data from the page.
- Data Storage: to store our data in a safe and efficient manner.
- Proxy Integration: to get past anti-bots and anything else that might block us.
Step 1: Create Simple Search Data Parser
We'll start by creating setting up a basic scraper with some error handling and retry logic. Along with that, we'll read our API key from a config file, config.json
. Simply create this file and add your API key to it.
The entire config file should look like this:
{
"api_key": "YOUR-SUPER-SECRET-API-KEY"
}
Here is our full code so far:
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep
OPTIONS = webdriver.ChromeOptions()
prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False
while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
driver.get(url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")
print("found div cards:", len(div_cards))
for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")
search_data = {
"name": title,
"url": url,
"image": img_url
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
In the example above, pay attention to our parsing logic:
is_card = div_card.get_attribute("data-grid-item")
determines whether or not each div is a search result. All search results contain the attribute,data-grif-item
.- We pull the title with
a_element.get_attribute("aria-label")
- We find the url of each pin with
a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
We then replace the ScrapeOps URL with Pinterest's URL. - Once we've parsed through all these results, we set
success
toTrue
and exit the function.
Step 2: Storing the Scraped Data
Pulling the right data doesn't do us much good if we can't store it. To store our data, we'll be adding both a SearchData
class, and a DataPipeline
class.
SearchData
simply takes our data and turns it into a uniform object that holds it.- Once we have our
SearchData
, we can then pass it into theDataPipeline
which filters out our duplicates and saves all of our relevant information to a CSV file.
Take a look at our updated code now:
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep
OPTIONS = webdriver.ChromeOptions()
prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False
while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
driver.get(url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")
print("found div cards:", len(div_cards))
for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")
search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Quick recap of our changes:
- Instead of turning our
search_data
into adict
, we use it to build aSearchData
object. - Once we have our
search_data
, we pass it into thedata_pipeline
. - After this operation is complete and we've exited
scrape_search_results()
, we close the pipeline.
Step 3: Bypassing Anti-Bots
As you may have noticed in our earlier examples, we're not running in headless mode and we have JavaScript turned off.
The reason for this: we need ScrapeOps to render the page (rendering in headless mode sometimes causes issues with Selenium), and we can't execute the JavaScript that Pinterest uses for authentication/fetching more content... it will destroy the page.
The options below show our ChromeOptions
. In the prefs
, you should see "profile.managed_default_content_settings.javascript": 2
. This turns off JavaScript support.
OPTIONS = webdriver.ChromeOptions()
prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")
Here is the function we'll be using for proxy support:
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
Here is our production ready crawler:
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep
OPTIONS = webdriver.ChromeOptions()
prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False
while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")
print("found div cards:", len(div_cards))
for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")
search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
Step 4: Production Run
Now it's time to run it in production and test it out. Here is our main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
If you're following along, feel free to tweak these constants anyway you'd like and experiment with the results.
Here are the results from our production run.
It took 16.68 seconds to parse the page and save the results. Your results will probably vary based on your hardware, internet connection, and the location of your server.
Build A Pinterest Scraper
When we build our pin scraper, we need to incorporate the following things into our design:
- Parse the information from a pin.
- Read the rows from the CSV file.
- Store the data we extracted when parsing.
- Perform all these actions concurrently.
- Integrate with the ScrapeOps Proxy API
Step 1: Create Simple Data Parser
Just like before, we'll get started with a simple parsing function. Take a look a the snippet below, while the parsing logic looks different, the overall structure is virtually identical.
def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(url)
try:
main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']")
website = "n/a"
website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']")
has_website = len(website_holder) > 0
if has_website:
website = f"https://{website_holder[0].text}"
star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']")
stars = len(star_divs)
profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']")
account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div")
account_name = nested_divs[0].get_attribute("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")
img = "n/a"
img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']")
if len(img_container) > 0:
img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src")
pin_data = {
"name": account_name,
"website": website,
"stars": stars,
"follower_count": follower_count,
"image": img
}
print(pin_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
Key points from above:
- We use
find_elements()
to look forwebsite_holder
. - If our
website_holder
is longer than zero, there is a website present, so we reassign the website from"n/a"
to the actual website. - We find all the
star_divs
. Each star is a unique element on the page, so we can obtain the rating by counting these elements. - We then find the
follower_count
,account_name
and theimage
of the pin.
Step 2: Loading URLs To Scrape
Our parsing function doesn't do much good if our scraping doesn't know what to parse.
- This function uses
csv.Dictreader()
to read the file into an array. - We then pass each row from the array into
process_pin()
. - Later on, we'll add concurrency to this function, but for now, we'll use a
for
loop as a placeholder.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_pin(row, location, retries=retries)
Here is our fully updated code:
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep
OPTIONS = webdriver.ChromeOptions()
prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False
while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")
print("found div cards:", len(div_cards))
for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")
search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(url)
try:
main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']")
website = "n/a"
website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']")
has_website = len(website_holder) > 0
if has_website:
website = f"https://{website_holder[0].text}"
star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']")
stars = len(star_divs)
profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']")
account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div")
account_name = nested_divs[0].get_attribute("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")
img = "n/a"
img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']")
if len(img_container) > 0:
img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src")
pin_data = {
"name": account_name,
"website": website,
"stars": stars,
"follower_count": follower_count,
"image": img
}
print(pin_data)
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_pin(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
Time to store our data... sound familiar? In this section we'll create another dataclass, PinData
. Take a look below, our PinData
is actually very similar to SearchData
.
This object will even be passed into the DataPipeline
the same way.
@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Now, let's update our script:
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep
OPTIONS = webdriver.ChromeOptions()
prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
print(proxy_url)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False
while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")
print("found div cards:", len(div_cards))
for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")
search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(url)
try:
main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']")
pin_pipeline = DataPipeline(csv_filename=f"{row['name'][0:20].replace(' ', '-')}.csv")
website = "n/a"
website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']")
has_website = len(website_holder) > 0
if has_website:
website = f"https://{website_holder[0].text}"
star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']")
stars = len(star_divs)
profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']")
account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div")
account_name = nested_divs[0].get_attribute("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")
img = "n/a"
img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']")
if len(img_container) > 0:
img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src")
pin_data = PinData(
name=account_name,
website=website,
stars=stars,
follower_count=follower_count,
image=img
)
pin_pipeline.add_data(pin_data)
pin_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_pin(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
The main differences in this version of the script are:
- Inside of
process_pin()
we instantiate aDataPipeline
object. - Once we've parsed our data, we use it to create a
PinData
object. - We pass the
pin_data
variable into the thedata_pipeline
. - After we've finished using the pipeline, we close it and exit the function.
Step 4: Adding Concurrency
Efficiency and speed are key when doing any large task at scale. To get more make our scraper more efficient and faster, we're going to add concurrency with ThreadPoolExecutor
.
Here we make a simple, but big change to process_results()
.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_pin,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Time to explain the arguments to executor.map()
:
process_pin
is the function we wish to run on multiple threadsreader
is the array of objects we want to pass into the function- We also pass in our
location
and ourretries
as arrays.
Step 5: Bypassing Anti-Bots
We're almost ready for production, but before we can run our scraper, we need to once again add in proxy support. Since we've already got our get_scrapeops_url()
function, we just need to add it into one line.
driver.get(get_scrapeops_url(url, location=location))
Here is our production ready code containing both the crawler and the scraper:
import os
import csv
import json
import logging
from urllib.parse import urlencode
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.common.by import By
from dataclasses import dataclass, field, fields, asdict
from time import sleep
OPTIONS = webdriver.ChromeOptions()
prefs = {
"profile.managed_default_content_settings.javascript": 2
}
OPTIONS.add_experimental_option("prefs", prefs)
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
OPTIONS.add_argument(f"useragent={user_agent}")
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
tries = 0
success = False
while tries <= retries and not success:
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
driver = webdriver.Chrome(options=OPTIONS)
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
logger.info(f"Fetched {url}")
## Extract Data
div_cards = driver.find_elements(By.CSS_SELECTOR, "div")
print("found div cards:", len(div_cards))
for div_card in div_cards:
is_card = div_card.get_attribute("data-grid-item")
if is_card:
a_element = div_card.find_element(By.CSS_SELECTOR, "a")
title = a_element.get_attribute("aria-label")
href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
url = f"https://pinterest.com{href}"
img = div_card.find_element(By.CSS_SELECTOR, "img")
img_url = img.get_attribute("src")
search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(get_scrapeops_url(url, location=location))
try:
main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']")
pin_pipeline = DataPipeline(csv_filename=f"{row['name'][0:20].replace(' ', '-')}.csv")
website = "n/a"
website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']")
has_website = len(website_holder) > 0
if has_website:
website = f"https://{website_holder[0].text}"
star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']")
stars = len(star_divs)
profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']")
account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div")
account_name = nested_divs[0].get_attribute("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")
img = "n/a"
img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']")
if len(img_container) > 0:
img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src")
pin_data = PinData(
name=account_name,
website=website,
stars=stars,
follower_count=follower_count,
image=img
)
pin_pipeline.add_data(pin_data)
pin_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_pin,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Time to run the whole thing in production. Feel free to take a look at the main
again and tweak whatever constants you'd like.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Here are the results:
The operation finished in roughly 103 seconds. This comes out to an average of about 4.68 seconds per page. Considering that we're waiting 2 seconds for the server to return the page, this is pretty decent.
Legal and Ethical Considerations
Whenever you scrape a website, you need to be aware of the Terms of Service
and robots.txt.
. You can view Pinterest's terms here.
If you access private data on their site in a way that violates these terms, you can even lose your Pinterest account! You can view their robots.txt
here.
Also, keep in mind whether you are scraping public data. Private data (data behind a login), can often be illegal to scrape. Generally, public data (data not behind a login) is public information and therefore fair game when scraping.
If you are unsure of the legality of a your scraper, it is best to consult an attorney based in your jurisdiction.
Conclusion
Congratulations! You've finished this tutorial. Take your new knowledge of Selenium, CSS selectors, parsing, proxy integration, data storage and build something!
If you'd like to know more about the tech stack used in this article, take a look at the links below.
More Web Scraping Guides
Here at ScrapeOps, we've got tons of useful content.
Check out our Selenium Web Scraping Playbook or read one of the articles below and level up your scraping skills!!!