How to Scrape Pinterest Requests and BeautifulSoup
For years, Pinterest has been the go-to for all things creative on the internet. Whether you're looking for interesting recipes, decorative ideas, or anything else, Pinterest is a great place to go! Along with all of this, Pinterest is also a social network. This means that we can scrape valuable data such as account names, followers and more.
In this guide, we'll go over the following topics:
- TLDR: How to Scrape Pinterest
- How To Architect Our Scraper
- Understanding How To Scrape Pinterest
- Setting Up Our Pinterest Scraper
- Build A Pinterest Search Crawler
- Build A Pinterest Scraper
- Legal and Ethical Considerations
- Conclusion
- More Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Pinterest
If you're looking for a Pinterest scraper and you don't have time to read the article, we've got one for you right here and ready to go.
To use this code, create a config.json
file with your "api_key"
and place it in the same folder as this scraper. At that point, it's ready to go!!!
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
print(scrapeops_proxy_url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div")
result_count = 0
for div_card in div_cards:
if div_card.get("data-grid-item"):
result_count += 1
title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]
search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[data-test-id='CloseupDetails']")
website = "n/a"
has_website = main_card.select_one("span[style='text-decoration: underline;']")
if has_website:
website = f"https://{has_website.text}"
star_divs = main_card.select("div[data-test-id='rating-star-full']")
stars = len(star_divs)
profile_info = main_card.select_one("div[data-test-id='follower-count']")
account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_all("div")
account_name = nested_divs[0].get("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")
img_container = soup.select_one("div[data-test-id='pin-closeup-image']")
img = img_container.find("img").get("src")
pin_data = {
"name": account_name,
"website": website,
"stars": stars,
"follower_count": follower_count,
"image": img
}
print(pin_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_pin(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Feel free to tweak it as you need. You can change any of the following:
MAX_RETRIES
: This parameter sets the maximum number of attempts the script will make to fetch data from a URL if the initial request fails.MAX_THREADS
: This parameter sets the maximum number of threads to use for processing results concurrently. This can speed up the processing of multiple pins or search results.LOCATION
: This parameter sets the geographical location from which the requests are made. It can affect the content returned by the website due to region-specific restrictions or differences.keywords_list
: This list contains the keywords for which you want to scrape Pinterest search results.
How To How To Architect Our Pinterest Scraper
Our scraper is going to utilize parsing, data storage, concurrency, and proxy integration. When we use a headless browser such as Selenium or Puppeteer, have have the ability to interact with the page and render JavaScript.
With Requests/BeautifulSoup, we don't get this luxury, so we'll be using the ScrapeOps Headless Browser to compensate for that.
In this tutorial, we'll be building both a scraper and a crawler.
Our project will utilize:
- Parsing: to extract the important data from Pinterest.
- Data Storage: To store our data for later review and also to feed information into our scraper.
- Concurrency: to process multiple pages simultaneously and efficiently.
- Proxy Integration: Pinterest is notoriously difficult to access programmatically, so we'll be using the ScrapeOps Proxy API.
Understanding How To Scrape Pinterest
Step 1: How To Request Pinterest Pages
When we perform a search on Pinterest, we're making a GET request to the server. A get request includes our base url and some additional parameters. Feel free to take a look at the screenshot below, it's a search for the keyword "grilling".
If you look at the address bar, our URL is:
https://www.pinterest.com/search/pins/?q=grilling&rs=typed
- Our base domain is
https://www.pinterest.com/search/pins/
and our query parameters areq=grilling&rs=typed
. rs=typed
is a standard param that gets added to the url when you perform a search on Pinterest.q=grilling
contains the actual keywords we're searching for (in this case, "grilling").
Individual pages on Pinterest are just a simple number. Here is a pin page from the search we performed above. As you can see, the URL is pretty simple:
https://www.pinterest.com/pin/45176802505307132/
https://www.pinterest.com/pin/
tells the server that we want a pin. 45176802505307132
represents the number of the pin.
Step 2: How To Extract Data From Pinterest Results and Pages
There are a couple important things to extract the data from a Pinterest page.
- First, our content is all loaded via JavaScript, so we won't be able to pull the page content until it's been rendered. To do this, we'll be passing the
wait
argument into the ScrapeOps API.
The ScrapeOps API actually runs a headless browser inside of it. When we use the wait
param, this tells to the ScrapeOps server to wait
a certain amount of time for the content to render and then send the page results back to us.
- Once we've got our content, it's nested pretty badly inside the page. Even worse, Pinterest uses dynamic CSS classes and does not use traditional CSS for the page layout.
If you look below, you'll see exactly what I'm talking about.
Now, let's take a look at the pin page. Most of our important pieces of data contain the trait data-test-id
. When scraping the pin page, we'll be using data-test-id
to find most of our relevant information.
Step 3: Geolocated Data
To scrape Pinterest, we'll be using the country
param to the ScrapeOps API as well. This parameter allows us to be routed through a server in whichever country we choose.
- For instance, if we want to appear in the US, we'd set our country to
"us"
. - If we want to appear in the UK, we can set our country to
"uk"
.
During testing, this parameter was incredibly important. Occasionally you can even get blocked when using a proxy and this happened during testing. The simple fix was to change our country from the US to the UK.
If you are following along and have issues with your scrape even though it worked successfully earlier, first try changing your location with the ScrapeOps API, this did the trick for us.
Setting Up Our pinterest Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir pinterest-scraper
cd pinterest-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A Pinterest Search Crawler
The first scraper we build is going to be our crawler. Let's get started! Our crawler is going to do the following:
- Parsing: to pull the important data from the page.
- Data Storage: to safely store our data for later use.
- Proxy: to get past anti-bots and any other potential roadblocks we may encounter.
If our crawler utilizes these things, we can:
- Fetch a page
- Extract the results
- Save the results
- Bypass any potential anti-bots or other blockers
Step 1: Create Simple Search Data Parser
Let's get started by building a parser. The goal of our parser is to fetch a page, and then extract information from that website.
The code structure below is relatively simple. After our imports, we read our API key with the script below:
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
- First, we declare our API key as a variable, then after reading
config.json
, we assign they key from the file to ourAPI_KEY
variable. - Then, we create a function,
scrape_search_results()
, which does the parsing. - As long as we have retries left and the operation has not succeeded, we
try
to get the page and then pull the information from it.- If the operation fails, we retry until it either succeeds or runs out of retries.
- If we completely run out of retries, we allow the scraper to crash and print an error message.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div")
result_count = 0
for div_card in div_cards:
if div_card.get("data-grid-item"):
result_count += 1
title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]
search_data = {
"name": title,
"url": url,
"image": img_url
}
print(search_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
The code above does the following:
- After finding all the
divs
, we check each one withdiv_card.get("data-grid-item")
. Each result in our search is adata-grid-item
. - We then find each link element with
div_card.find("a")
and we extract it withurl = f"https://pinterest.com{a_element['href']}"
. - To find our image, we use
img = div_card.find("img")
and we then pull the link to the image withimg_url = img["src"]
.
Step 2: Storing the Scraped Data
Now that we're getting the proper information, we need to be able to store our data. We'll be using two separate classes for our data, SearchData
, and Datapipeline
.
SearchData
is a class built specifically to hold our data.DataPipeline
is a pipeline to a CSV file. This class filters out duplicates from hitting our CSV and then stores the CSV safely.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div")
result_count = 0
for div_card in div_cards:
if div_card.get("data-grid-item"):
result_count += 1
title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]
search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
- After we've pulled our data from the page, we turn it into a
SearchData
object. - Next, we add
search_data
to the pipeline withdata_pipeline.add_data(search_data)
. - Once our operation has finished, we close the pipeline.
Step 3: Bypassing Anti-Bots
At this point, the crawler is more or less finished, but first, we need to add anti-bot support.
Typically, we would not have the wait
parameter in the code below, but on Pinterest, all of our content is dynamically generated, so "wait": 2000
tells the ScrapeOps server to wait 2 seconds for our content to render and then it sends us the page.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
Take a look at our overall script now:
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div")
result_count = 0
for div_card in div_cards:
if div_card.get("data-grid-item"):
result_count += 1
title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]
search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
In this code, we parse the search results. Afterward, we store them in a CSV file. Finally we do all of this with a proxy. Our proxy is incredibly important, it does the following:
- Penetrates any systems that may block us.
wait
2 seconds for the page to render.- Sends us the page after it has loaded.
Step 4: Production Run
Now that we've got a working crawler, it's time to run it in production. Take a look at our main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
We do a search for "grilling"
. Feel free to change any of the constants yourself and tweak the code, just remember, we don't have actual concurrency yet, this will be added in when we're scraping the individual posts that we find with the crawler.
Here are the results from our crawler:
We crawled "grilling"
in 7.331 seconds. Results may vary based on the location of your server and the quality of your internet connection.
Build A Pinterest Scraper
Next, it's time to build our Pinterest scraper. The scraper needs to be able to do the following:
- Parse the information from a pin.
- Read the rows from the CSV file.
- Store the data we extracted when parsing.
- Perform all these actions concurrently.
- Integrate with the ScrapeOps Proxy API
Step 1: Create Simple Data Parser
Let's get started building our pin parser. This parser needs to lookup a pin, and then pull information from that pin. The code below contains our process_pin()
function.
Similar to our crawler, we use the retries and success model. While we still have retries left and the operation hasn't succeeded, we find the main card and pull relevant information from it.
def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[data-test-id='CloseupDetails']")
website = "n/a"
has_website = main_card.select_one("span[style='text-decoration: underline;']")
if has_website:
website = f"https://{has_website.text}"
star_divs = main_card.select("div[data-test-id='rating-star-full']")
stars = len(star_divs)
profile_info = main_card.select_one("div[data-test-id='follower-count']")
account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_all("div")
account_name = nested_divs[0].get("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")
img_container = soup.select_one("div[data-test-id='pin-closeup-image']")
img = img_container.find("img").get("src")
pin_data = {
"name": account_name,
"website": website,
"stars": stars,
"follower_count": follower_count,
"image": img
}
print(pin_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
When we're attempting to parse the pin, we do the following:
- Find the
main_card
using its CSS selector:main_card = soup.select_one("div[data-test-id='CloseupDetails']")
. main_card.select("div[data-test-id='rating-star-full']")
finds all of the star elements on the page. We then count the stars withstars = len(star_divs)
.- Find the div that holds the account information with
account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']")
. nested_divs[0].get("title")
finds our account name.- We remove our
account_name
and other irrelevant text withprofile_info.text.replace(account_name, "").replace(" followers", "")
Step 2: Loading URLs To Scrape
Now, we need to load our urls. We can't look our pins up and parse them if we can't load the urls from the CSV file. It's time to update our overall code to add the parsing function above and to read the CSV file.
Let's start with our process_results()
function:
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_pin(row, location, retries=retries)
Now, take a look at the overall code to see how it all fits together.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
print(scrapeops_proxy_url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div")
result_count = 0
for div_card in div_cards:
if div_card.get("data-grid-item"):
result_count += 1
title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]
search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[data-test-id='CloseupDetails']")
website = "n/a"
has_website = main_card.select_one("span[style='text-decoration: underline;']")
if has_website:
website = f"https://{has_website.text}"
star_divs = main_card.select("div[data-test-id='rating-star-full']")
stars = len(star_divs)
profile_info = main_card.select_one("div[data-test-id='follower-count']")
account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_all("div")
account_name = nested_divs[0].get("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")
img_container = soup.select_one("div[data-test-id='pin-closeup-image']")
img = img_container.find("img").get("src")
pin_data = {
"name": account_name,
"website": website,
"stars": stars,
"follower_count": follower_count,
"image": img
}
print(pin_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_pin(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
As before, we need to store our scraped data. We need to create another dataclass
, PinData
. Just like SearchData
, the job of PinData
is to simply hold data. We then go ahead and pass this into a DataPipeline
.
Take a look, it's almost identical to SearchData
.
@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Now, lets update our script.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
print(scrapeops_proxy_url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div")
result_count = 0
for div_card in div_cards:
if div_card.get("data-grid-item"):
result_count += 1
title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]
search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[data-test-id='CloseupDetails']")
pin_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
website = "n/a"
has_website = main_card.select_one("span[style='text-decoration: underline;']")
if has_website:
website = f"https://{has_website.text}"
star_divs = main_card.select("div[data-test-id='rating-star-full']")
stars = len(star_divs)
profile_info = main_card.select_one("div[data-test-id='follower-count']")
account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_all("div")
account_name = nested_divs[0].get("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")
img_container = soup.select_one("div[data-test-id='pin-closeup-image']")
img = img_container.find("img").get("src")
pin_data = PinData(
name=account_name,
website=website,
stars=stars,
follower_count=follower_count,
image=img
)
pin_pipeline.add_data(pin_data)
pin_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_pin(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Aside from our new class, here are the key differences you should notice:
- We open a new
DataPipeline
for ourPinData
,pin_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
. - Intead of printing the dictionary like we did earlier, we construct a
PinData
object out of it. - We pass the
PinData
into our pipeline and then close the pipeline.
Step 4: Adding Concurrency
We've hit the point that we need to start thinking about performace. To achieve better performance, we need to add concurrency.
To do this, we're going to use ThreadPoolExecutor
to add multithreading support to our scraper. Our MAX_THREADS
constant will finally get used now.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_pin,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Let's outline the different arguments to executor.map()
:
process_pin
is the function that we're calling to to run on multiple threadsreader
is an array ofdict
objects that we read from the CSV file.- We then pass the location in as an array the length of the
reader
- We pass the retries in as an array as well
Step 5: Bypassing Anti-Bots
There is one final change we need to make to our scraper. Inside of process_pin()
, we change the following line.
response = requests.get(get_scrapeops_url(url, location=location))
Here is our fully updated scraper:
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class PinData:
name: str = ""
website: str = ""
stars: int = 0
follower_count: str = ""
image: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
print(scrapeops_proxy_url)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.find_all("div")
result_count = 0
for div_card in div_cards:
if div_card.get("data-grid-item"):
result_count += 1
title = div_card.text
a_element = div_card.find("a")
url = f"https://pinterest.com{a_element['href']}"
img = div_card.find("img")
img_url = img["src"]
search_data = SearchData(
name=title,
url=url,
image=img_url
)
data_pipeline.add_data(search_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_pin(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_card = soup.select_one("div[data-test-id='CloseupDetails']")
pin_pipeline = DataPipeline(csv_filename=f"{row['name'][0:20].replace(' ', '-')}.csv")
website = "n/a"
has_website = main_card.select_one("span[style='text-decoration: underline;']")
if has_website:
website = f"https://{has_website.text}"
star_divs = main_card.select("div[data-test-id='rating-star-full']")
stars = len(star_divs)
profile_info = main_card.select_one("div[data-test-id='follower-count']")
account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']")
nested_divs = account_name_div.find_all("div")
account_name = nested_divs[0].get("title")
follower_count = profile_info.text.replace(account_name, "").replace(" followers", "")
img_container = soup.select_one("div[data-test-id='pin-closeup-image']")
img = img_container.find("img").get("src")
pin_data = PinData(
name=account_name,
website=website,
stars=stars,
follower_count=follower_count,
image=img
)
pin_pipeline.add_data(pin_data)
pin_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_pin,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 6: Production Run
Now that we've got our production scraper, it's time for our production run. Once again, take a look at the main
and feel free to change any constant you want.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["grilling"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Here are the results.
The crawl and the scraping process finished in 51.88 seconds. All in all, this comes out to just over 2 seconds per page. Considering that ScrapeOps is waiting 2 seconds before sending each page back to us, 2 seconds per page is incredible!!!
Legal and Ethical Considerations
Whenever you scrape a website, you need to be aware of the Terms of Service
and robots.txt.
. You can view Pinterest's terms here.
If you access private data on their site in a way that violates these terms, you can even lose your Pinterest account! You can view their robots.txt
here.
Also, keep in mind whether you are scraping public data. Private data (data behind a login), can often be illegal to scrape. Generally, public data (data not behind a login) is public information and therefore fair game when scraping.
If you are unsure of the legality of a your scraper, it is best to consult an attorney based in your jurisdiction.
Conclusion
You made it! Congratulations on finishing this tutorial. You now know how to build both a crawler and a scraper. You also have a solid grasp of parsing, data storage, concurrency, and proxy integration.
You should also have a solid grasp of how to use requests
and beautifulsoup
.
More Web Scraping Guides
Now that you know how to scrape Pinterest, you have a whole new skillset for your scraping toolbox. Take this knowledge and go build something!!!
If you're in the mood to learn more, check out our Python Web Scraping Playbook or one of these cool articles below!!!