Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your "api_key"
and place it in the same folder as this scraper. At that point, it's ready to go!!!import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) print(scrapeops_proxy_url) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div") result_count = 0 for div_card in div_cards: if div_card.get("data-grid-item"): result_count += 1 title = div_card.text a_element = div_card.find("a") url = f"https://pinterest.com{a_element['href']}" img = div_card.find("img") img_url = img["src"] search_data = SearchData( name=title, url=url, image=img_url ) data_pipeline.add_data(search_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_pin(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[data-test-id='CloseupDetails']") website = "n/a" has_website = main_card.select_one("span[style='text-decoration: underline;']") if has_website: website = f"https://{has_website.text}" star_divs = main_card.select("div[data-test-id='rating-star-full']") stars = len(star_divs) profile_info = main_card.select_one("div[data-test-id='follower-count']") account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']") nested_divs = account_name_div.find_all("div") account_name = nested_divs[0].get("title") follower_count = profile_info.text.replace(account_name, "").replace(" followers", "") img_container = soup.select_one("div[data-test-id='pin-closeup-image']") img = img_container.find("img").get("src") pin_data = { "name": account_name, "website": website, "stars": stars, "follower_count": follower_count, "image": img } print(pin_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_pin(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: This parameter sets the maximum number of attempts the script will make to fetch data from a URL if the initial request fails.MAX_THREADS
: This parameter sets the maximum number of threads to use for processing results concurrently. This can speed up the processing of multiple pins or search results.LOCATION
: This parameter sets the geographical location from which the requests are made. It can affect the content returned by the website due to region-specific restrictions or differences.keywords_list
: This list contains the keywords for which you want to scrape Pinterest search results.https://www.pinterest.com/search/pins/?q=grilling&rs=typed
https://www.pinterest.com/search/pins/
and our query parameters are q=grilling&rs=typed
.rs=typed
is a standard param that gets added to the url when you perform a search on Pinterest.q=grilling
contains the actual keywords we're searching for (in this case, "grilling").https://www.pinterest.com/pin/45176802505307132/
https://www.pinterest.com/pin/
tells the server that we want a pin. 45176802505307132
represents the number of the pin.wait
argument into the ScrapeOps API.wait
param, this tells to the ScrapeOps server to wait
a certain amount of time for the content to render and then send the page results back to us.data-test-id
. When scraping the pin page, we'll be using data-test-id
to find most of our relevant information.country
param to the ScrapeOps API as well. This parameter allows us to be routed through a server in whichever country we choose."us"
."uk"
.mkdir pinterest-scraper cd pinterest-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"]
config.json
, we assign they key from the file to our API_KEY
variable.scrape_search_results()
, which does the parsing.try
to get the page and then pull the information from it.
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div") result_count = 0 for div_card in div_cards: if div_card.get("data-grid-item"): result_count += 1 title = div_card.text a_element = div_card.find("a") url = f"https://pinterest.com{a_element['href']}" img = div_card.find("img") img_url = img["src"] search_data = { "name": title, "url": url, "image": img_url } print(search_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
divs
, we check each one with div_card.get("data-grid-item")
. Each result in our search is a data-grid-item
.div_card.find("a")
and we extract it with url = f"https://pinterest.com{a_element['href']}"
.img = div_card.find("img")
and we then pull the link to the image with img_url = img["src"]
.SearchData
, and Datapipeline
.SearchData
is a class built specifically to hold our data.DataPipeline
is a pipeline to a CSV file. This class filters out duplicates from hitting our CSV and then stores the CSV safely.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div") result_count = 0 for div_card in div_cards: if div_card.get("data-grid-item"): result_count += 1 title = div_card.text a_element = div_card.find("a") url = f"https://pinterest.com{a_element['href']}" img = div_card.find("img") img_url = img["src"] search_data = SearchData( name=title, url=url, image=img_url ) data_pipeline.add_data(search_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
SearchData
object.search_data
to the pipeline with data_pipeline.add_data(search_data)
.wait
parameter in the code below, but on Pinterest, all of our content is dynamically generated, so "wait": 2000
tells the ScrapeOps server to wait 2 seconds for our content to render and then it sends us the page.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div") result_count = 0 for div_card in div_cards: if div_card.get("data-grid-item"): result_count += 1 title = div_card.text a_element = div_card.find("a") url = f"https://pinterest.com{a_element['href']}" img = div_card.find("img") img_url = img["src"] search_data = SearchData( name=title, url=url, image=img_url ) data_pipeline.add_data(search_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
wait
2 seconds for the page to render.main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
"grilling"
. Feel free to change any of the constants yourself and tweak the code, just remember, we don't have actual concurrency yet, this will be added in when we're scraping the individual posts that we find with the crawler.Here are the results from our crawler:"grilling"
in 7.331 seconds. Results may vary based on the location of your server and the quality of your internet connection.process_pin()
function.Similar to our crawler, we use the retries and success model. While we still have retries left and the operation hasn't succeeded, we find the main card and pull relevant information from it.def process_pin(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[data-test-id='CloseupDetails']") website = "n/a" has_website = main_card.select_one("span[style='text-decoration: underline;']") if has_website: website = f"https://{has_website.text}" star_divs = main_card.select("div[data-test-id='rating-star-full']") stars = len(star_divs) profile_info = main_card.select_one("div[data-test-id='follower-count']") account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']") nested_divs = account_name_div.find_all("div") account_name = nested_divs[0].get("title") follower_count = profile_info.text.replace(account_name, "").replace(" followers", "") img_container = soup.select_one("div[data-test-id='pin-closeup-image']") img = img_container.find("img").get("src") pin_data = { "name": account_name, "website": website, "stars": stars, "follower_count": follower_count, "image": img } print(pin_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
main_card
using its CSS selector: main_card = soup.select_one("div[data-test-id='CloseupDetails']")
.main_card.select("div[data-test-id='rating-star-full']")
finds all of the star elements on the page. We then count the stars with stars = len(star_divs)
.account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']")
.nested_divs[0].get("title")
finds our account name.account_name
and other irrelevant text with profile_info.text.replace(account_name, "").replace(" followers", "")
process_results()
function:def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_pin(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) print(scrapeops_proxy_url) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div") result_count = 0 for div_card in div_cards: if div_card.get("data-grid-item"): result_count += 1 title = div_card.text a_element = div_card.find("a") url = f"https://pinterest.com{a_element['href']}" img = div_card.find("img") img_url = img["src"] search_data = SearchData( name=title, url=url, image=img_url ) data_pipeline.add_data(search_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_pin(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[data-test-id='CloseupDetails']") website = "n/a" has_website = main_card.select_one("span[style='text-decoration: underline;']") if has_website: website = f"https://{has_website.text}" star_divs = main_card.select("div[data-test-id='rating-star-full']") stars = len(star_divs) profile_info = main_card.select_one("div[data-test-id='follower-count']") account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']") nested_divs = account_name_div.find_all("div") account_name = nested_divs[0].get("title") follower_count = profile_info.text.replace(account_name, "").replace(" followers", "") img_container = soup.select_one("div[data-test-id='pin-closeup-image']") img = img_container.find("img").get("src") pin_data = { "name": account_name, "website": website, "stars": stars, "follower_count": follower_count, "image": img } print(pin_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_pin(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
dataclass
, PinData
. Just like SearchData
, the job of PinData
is to simply hold data. We then go ahead and pass this into a DataPipeline
.Take a look, it's almost identical to SearchData
.@dataclassclass PinData: name: str = "" website: str = "" stars: int = 0 follower_count: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass PinData: name: str = "" website: str = "" stars: int = 0 follower_count: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) print(scrapeops_proxy_url) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div") result_count = 0 for div_card in div_cards: if div_card.get("data-grid-item"): result_count += 1 title = div_card.text a_element = div_card.find("a") url = f"https://pinterest.com{a_element['href']}" img = div_card.find("img") img_url = img["src"] search_data = SearchData( name=title, url=url, image=img_url ) data_pipeline.add_data(search_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_pin(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[data-test-id='CloseupDetails']") pin_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") website = "n/a" has_website = main_card.select_one("span[style='text-decoration: underline;']") if has_website: website = f"https://{has_website.text}" star_divs = main_card.select("div[data-test-id='rating-star-full']") stars = len(star_divs) profile_info = main_card.select_one("div[data-test-id='follower-count']") account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']") nested_divs = account_name_div.find_all("div") account_name = nested_divs[0].get("title") follower_count = profile_info.text.replace(account_name, "").replace(" followers", "") img_container = soup.select_one("div[data-test-id='pin-closeup-image']") img = img_container.find("img").get("src") pin_data = PinData( name=account_name, website=website, stars=stars, follower_count=follower_count, image=img ) pin_pipeline.add_data(pin_data) pin_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_pin(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
DataPipeline
for our PinData
, pin_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
.PinData
object out of it.PinData
into our pipeline and then close the pipeline.ThreadPoolExecutor
to add multithreading support to our scraper. Our MAX_THREADS
constant will finally get used now.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_pin, reader, [location] * len(reader), [retries] * len(reader) )
executor.map()
:process_pin
is the function that we're calling to to run on multiple threadsreader
is an array of dict
objects that we read from the CSV file.reader
process_pin()
, we change the following line.response = requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass PinData: name: str = "" website: str = "" stars: int = 0 follower_count: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) print(scrapeops_proxy_url) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.find_all("div") result_count = 0 for div_card in div_cards: if div_card.get("data-grid-item"): result_count += 1 title = div_card.text a_element = div_card.find("a") url = f"https://pinterest.com{a_element['href']}" img = div_card.find("img") img_url = img["src"] search_data = SearchData( name=title, url=url, image=img_url ) data_pipeline.add_data(search_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_pin(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_card = soup.select_one("div[data-test-id='CloseupDetails']") pin_pipeline = DataPipeline(csv_filename=f"{row['name'][0:20].replace(' ', '-')}.csv") website = "n/a" has_website = main_card.select_one("span[style='text-decoration: underline;']") if has_website: website = f"https://{has_website.text}" star_divs = main_card.select("div[data-test-id='rating-star-full']") stars = len(star_divs) profile_info = main_card.select_one("div[data-test-id='follower-count']") account_name_div = profile_info.select_one("div[data-test-id='creator-profile-name']") nested_divs = account_name_div.find_all("div") account_name = nested_divs[0].get("title") follower_count = profile_info.text.replace(account_name, "").replace(" followers", "") img_container = soup.select_one("div[data-test-id='pin-closeup-image']") img = img_container.find("img").get("src") pin_data = PinData( name=account_name, website=website, stars=stars, follower_count=follower_count, image=img ) pin_pipeline.add_data(pin_data) pin_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_pin, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
and feel free to change any constant you want.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Terms of Service
and robots.txt.
. You can view Pinterest's terms here.If you access private data on their site in a way that violates these terms, you can even lose your Pinterest account! You can view their robots.txt
here.Also, keep in mind whether you are scraping public data. Private data (data behind a login), can often be illegal to scrape. Generally, public data (data not behind a login) is public information and therefore fair game when scraping.If you are unsure of the legality of a your scraper, it is best to consult an attorney based in your jurisdiction.requests
and beautifulsoup
.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your API key and place it in the same folder as this file and you're ready to go!import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdictfrom time import sleep OPTIONS = webdriver.ChromeOptions() prefs = { "profile.managed_default_content_settings.javascript": 2}OPTIONS.add_experimental_option("prefs", prefs) user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"OPTIONS.add_argument(f"useragent={user_agent}") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass PinData: name: str = "" website: str = "" stars: int = 0 follower_count: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") tries = 0 success = False while tries <= retries and not success: url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" driver = webdriver.Chrome(options=OPTIONS) driver.set_page_load_timeout(30) driver.implicitly_wait(10) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div") print("found div cards:", len(div_cards)) for div_card in div_cards: is_card = div_card.get_attribute("data-grid-item") if is_card: a_element = div_card.find_element(By.CSS_SELECTOR, "a") title = a_element.get_attribute("aria-label") href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") url = f"https://pinterest.com{href}" img = div_card.find_element(By.CSS_SELECTOR, "img") img_url = img.get_attribute("src") search_data = SearchData( name=title, url=url, image=img_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_pin(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.get(get_scrapeops_url(url, location=location)) try: main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']") pin_pipeline = DataPipeline(csv_filename=f"{row['name'][0:20].replace(' ', '-')}.csv") website = "n/a" website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']") has_website = len(website_holder) > 0 if has_website: website = f"https://{website_holder[0].text}" star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']") stars = len(star_divs) profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']") account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']") nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div") account_name = nested_divs[0].get_attribute("title") follower_count = profile_info.text.replace(account_name, "").replace(" followers", "") img = "n/a" img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']") if len(img_container) > 0: img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src") pin_data = PinData( name=account_name, website=website, stars=stars, follower_count=follower_count, image=img ) pin_pipeline.add_data(pin_data) pin_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_pin, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_RETRIES
: This parameter sets the maximum number of attempts the script will make to fetch data from a URL if the initial request fails.MAX_THREADS
: This parameter sets the maximum number of threads to use for processing results concurrently. This can speed up the processing of multiple pins or search results.LOCATION
: This parameter sets the geographical location from which the requests are made. It can affect the content returned by the website due to region-specific restrictions or differences.keywords_list
: This list contains the keywords for which you want to scrape Pinterest search results."grilling"
, our URL looks like thishttps://www.pinterest.com/search/pins/?q=grilling&rs=typed
https://www.pinterest.com/search/pins/
.?
character denotes our queries which get separated by the &
if we have multiple queries.grilling
, and typed
. The full query string is ?q=grilling&rs=typed
.https://www.pinterest.com/pin/45176802505307132/
45176802505307132
. For any pin on Pinterest, the URL gets laid out like this:https://www.pinterest.com/pin/PIN-NUMBER-GOES-HERE/
country
parameter into the ScrapeOps API."us"
in as your country, you'll be routed through a server in the US."uk"
.mkdir pinterest-scraper cd pinterest-scraper
python -m venv venv
source venv/bin/activate
pip install selenium
config.json
. Simply create this file and add your API key to it.The entire config file should look like this:{ "api_key": "YOUR-SUPER-SECRET-API-KEY"}
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdictfrom time import sleep OPTIONS = webdriver.ChromeOptions() prefs = { "profile.managed_default_content_settings.javascript": 2}OPTIONS.add_experimental_option("prefs", prefs) user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"OPTIONS.add_argument(f"useragent={user_agent}") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") tries = 0 success = False while tries <= retries and not success: url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" driver = webdriver.Chrome(options=OPTIONS) driver.set_page_load_timeout(30) driver.implicitly_wait(10) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div") print("found div cards:", len(div_cards)) for div_card in div_cards: is_card = div_card.get_attribute("data-grid-item") if is_card: a_element = div_card.find_element(By.CSS_SELECTOR, "a") title = a_element.get_attribute("aria-label") href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") url = f"https://pinterest.com{href}" img = div_card.find_element(By.CSS_SELECTOR, "img") img_url = img.get_attribute("src") search_data = { "name": title, "url": url, "image": img_url } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
is_card = div_card.get_attribute("data-grid-item")
determines whether or not each div is a search result. All search results contain the attribute, data-grif-item
.a_element.get_attribute("aria-label")
a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "")
We then replace the ScrapeOps URL with Pinterest's URL.success
to True
and exit the function.SearchData
class, and a DataPipeline
class.SearchData
simply takes our data and turns it into a uniform object that holds it.SearchData
, we can then pass it into the DataPipeline
which filters out our duplicates and saves all of our relevant information to a CSV file.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdictfrom time import sleep OPTIONS = webdriver.ChromeOptions() prefs = { "profile.managed_default_content_settings.javascript": 2}OPTIONS.add_experimental_option("prefs", prefs) user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"OPTIONS.add_argument(f"useragent={user_agent}") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") tries = 0 success = False while tries <= retries and not success: url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" driver = webdriver.Chrome(options=OPTIONS) driver.set_page_load_timeout(30) driver.implicitly_wait(10) try: driver.get(url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div") print("found div cards:", len(div_cards)) for div_card in div_cards: is_card = div_card.get_attribute("data-grid-item") if is_card: a_element = div_card.find_element(By.CSS_SELECTOR, "a") title = a_element.get_attribute("aria-label") href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") url = f"https://pinterest.com{href}" img = div_card.find_element(By.CSS_SELECTOR, "img") img_url = img.get_attribute("src") search_data = SearchData( name=title, url=url, image=img_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
search_data
into a dict
, we use it to build a SearchData
object.search_data
, we pass it into the data_pipeline
.scrape_search_results()
, we close the pipeline.ChromeOptions
. In the prefs
, you should see "profile.managed_default_content_settings.javascript": 2
. This turns off JavaScript support.OPTIONS = webdriver.ChromeOptions() prefs = { "profile.managed_default_content_settings.javascript": 2}OPTIONS.add_experimental_option("prefs", prefs) user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"OPTIONS.add_argument(f"useragent={user_agent}")
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdictfrom time import sleep OPTIONS = webdriver.ChromeOptions() prefs = { "profile.managed_default_content_settings.javascript": 2}OPTIONS.add_experimental_option("prefs", prefs) user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"OPTIONS.add_argument(f"useragent={user_agent}") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") tries = 0 success = False while tries <= retries and not success: url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" driver = webdriver.Chrome(options=OPTIONS) driver.set_page_load_timeout(30) driver.implicitly_wait(10) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div") print("found div cards:", len(div_cards)) for div_card in div_cards: is_card = div_card.get_attribute("data-grid-item") if is_card: a_element = div_card.find_element(By.CSS_SELECTOR, "a") title = a_element.get_attribute("aria-label") href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") url = f"https://pinterest.com{href}" img = div_card.find_element(By.CSS_SELECTOR, "img") img_url = img.get_attribute("src") search_data = SearchData( name=title, url=url, image=img_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
def process_pin(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) try: main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']") website = "n/a" website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']") has_website = len(website_holder) > 0 if has_website: website = f"https://{website_holder[0].text}" star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']") stars = len(star_divs) profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']") account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']") nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div") account_name = nested_divs[0].get_attribute("title") follower_count = profile_info.text.replace(account_name, "").replace(" followers", "") img = "n/a" img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']") if len(img_container) > 0: img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src") pin_data = { "name": account_name, "website": website, "stars": stars, "follower_count": follower_count, "image": img } print(pin_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
find_elements()
to look for website_holder
.website_holder
is longer than zero, there is a website present, so we reassign the website from "n/a"
to the actual website.star_divs
. Each star is a unique element on the page, so we can obtain the rating by counting these elements.follower_count
, account_name
and the image
of the pin.csv.Dictreader()
to read the file into an array.process_pin()
.for
loop as a placeholder.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_pin(row, location, retries=retries)
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdictfrom time import sleep OPTIONS = webdriver.ChromeOptions() prefs = { "profile.managed_default_content_settings.javascript": 2}OPTIONS.add_experimental_option("prefs", prefs) user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"OPTIONS.add_argument(f"useragent={user_agent}") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") tries = 0 success = False while tries <= retries and not success: url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" driver = webdriver.Chrome(options=OPTIONS) driver.set_page_load_timeout(30) driver.implicitly_wait(10) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div") print("found div cards:", len(div_cards)) for div_card in div_cards: is_card = div_card.get_attribute("data-grid-item") if is_card: a_element = div_card.find_element(By.CSS_SELECTOR, "a") title = a_element.get_attribute("aria-label") href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") url = f"https://pinterest.com{href}" img = div_card.find_element(By.CSS_SELECTOR, "img") img_url = img.get_attribute("src") search_data = SearchData( name=title, url=url, image=img_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_pin(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) try: main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']") website = "n/a" website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']") has_website = len(website_holder) > 0 if has_website: website = f"https://{website_holder[0].text}" star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']") stars = len(star_divs) profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']") account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']") nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div") account_name = nested_divs[0].get_attribute("title") follower_count = profile_info.text.replace(account_name, "").replace(" followers", "") img = "n/a" img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']") if len(img_container) > 0: img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src") pin_data = { "name": account_name, "website": website, "stars": stars, "follower_count": follower_count, "image": img } print(pin_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_pin(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
PinData
. Take a look below, our PinData
is actually very similar to SearchData
.This object will even be passed into the DataPipeline
the same way.@dataclassclass PinData: name: str = "" website: str = "" stars: int = 0 follower_count: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdictfrom time import sleep OPTIONS = webdriver.ChromeOptions() prefs = { "profile.managed_default_content_settings.javascript": 2}OPTIONS.add_experimental_option("prefs", prefs) user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"OPTIONS.add_argument(f"useragent={user_agent}") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) print(proxy_url) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass PinData: name: str = "" website: str = "" stars: int = 0 follower_count: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") tries = 0 success = False while tries <= retries and not success: url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" driver = webdriver.Chrome(options=OPTIONS) driver.set_page_load_timeout(30) driver.implicitly_wait(10) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div") print("found div cards:", len(div_cards)) for div_card in div_cards: is_card = div_card.get_attribute("data-grid-item") if is_card: a_element = div_card.find_element(By.CSS_SELECTOR, "a") title = a_element.get_attribute("aria-label") href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") url = f"https://pinterest.com{href}" img = div_card.find_element(By.CSS_SELECTOR, "img") img_url = img.get_attribute("src") search_data = SearchData( name=title, url=url, image=img_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_pin(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) try: main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']") pin_pipeline = DataPipeline(csv_filename=f"{row['name'][0:20].replace(' ', '-')}.csv") website = "n/a" website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']") has_website = len(website_holder) > 0 if has_website: website = f"https://{website_holder[0].text}" star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']") stars = len(star_divs) profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']") account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']") nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div") account_name = nested_divs[0].get_attribute("title") follower_count = profile_info.text.replace(account_name, "").replace(" followers", "") img = "n/a" img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']") if len(img_container) > 0: img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src") pin_data = PinData( name=account_name, website=website, stars=stars, follower_count=follower_count, image=img ) pin_pipeline.add_data(pin_data) pin_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_pin(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
process_pin()
we instantiate a DataPipeline
object.PinData
object.pin_data
variable into the the data_pipeline
.ThreadPoolExecutor
.Here we make a simple, but big change to process_results()
.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_pin, reader, [location] * len(reader), [retries] * len(reader) )
executor.map()
:process_pin
is the function we wish to run on multiple threadsreader
is the array of objects we want to pass into the functionlocation
and our retries
as arrays.get_scrapeops_url()
function, we just need to add it into one line.driver.get(get_scrapeops_url(url, location=location))
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdictfrom time import sleep OPTIONS = webdriver.ChromeOptions() prefs = { "profile.managed_default_content_settings.javascript": 2}OPTIONS.add_experimental_option("prefs", prefs) user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"OPTIONS.add_argument(f"useragent={user_agent}") API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass PinData: name: str = "" website: str = "" stars: int = 0 follower_count: str = "" image: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") tries = 0 success = False while tries <= retries and not success: url = f"https://www.pinterest.com/search/pins/?q={formatted_keyword}&rs=typed" driver = webdriver.Chrome(options=OPTIONS) driver.set_page_load_timeout(30) driver.implicitly_wait(10) try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) logger.info(f"Fetched {url}") ## Extract Data div_cards = driver.find_elements(By.CSS_SELECTOR, "div") print("found div cards:", len(div_cards)) for div_card in div_cards: is_card = div_card.get_attribute("data-grid-item") if is_card: a_element = div_card.find_element(By.CSS_SELECTOR, "a") title = a_element.get_attribute("aria-label") href = a_element.get_attribute("href").replace("https://proxy.scrapeops.io", "") url = f"https://pinterest.com{href}" img = div_card.find_element(By.CSS_SELECTOR, "img") img_url = img.get_attribute("src") search_data = SearchData( name=title, url=url, image=img_url ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_pin(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=OPTIONS) driver.get(get_scrapeops_url(url, location=location)) try: main_card = driver.find_element(By.CSS_SELECTOR, "div[data-test-id='CloseupDetails']") pin_pipeline = DataPipeline(csv_filename=f"{row['name'][0:20].replace(' ', '-')}.csv") website = "n/a" website_holder = main_card.find_elements(By.CSS_SELECTOR, "span[style='text-decoration: underline;']") has_website = len(website_holder) > 0 if has_website: website = f"https://{website_holder[0].text}" star_divs = main_card.find_elements(By.CSS_SELECTOR, "div[data-test-id='rating-star-full']") stars = len(star_divs) profile_info = main_card.find_element(By.CSS_SELECTOR, "div[data-test-id='follower-count']") account_name_div = profile_info.find_element(By.CSS_SELECTOR, "div[data-test-id='creator-profile-name']") nested_divs = account_name_div.find_elements(By.CSS_SELECTOR, "div") account_name = nested_divs[0].get_attribute("title") follower_count = profile_info.text.replace(account_name, "").replace(" followers", "") img = "n/a" img_container = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='pin-closeup-image']") if len(img_container) > 0: img = img_container[0].find_element(By.CSS_SELECTOR, "img").get_attribute("src") pin_data = PinData( name=account_name, website=website, stars=stars, follower_count=follower_count, image=img ) pin_pipeline.add_data(pin_data) pin_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_pin, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
again and tweak whatever constants you'd like.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["grilling"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") scrape_search_results(keyword, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Terms of Service
and robots.txt.
. You can view Pinterest's terms here.If you access private data on their site in a way that violates these terms, you can even lose your Pinterest account! You can view their robots.txt
here.Also, keep in mind whether you are scraping public data. Private data (data behind a login), can often be illegal to scrape. Generally, public data (data not behind a login) is public information and therefore fair game when scraping.If you are unsure of the legality of a your scraper, it is best to consult an attorney based in your jurisdiction.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your API key.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, wait: 3000, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults( browser, keyword, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); await page.setJavaScriptEnabled(false); try { const url = `https://www.pinterest.com/search/pins/?q=${formattedKeyword}&rs=typed`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-grid-item='true']"); for (const divCard of divCards) { const aElement = await divCard.$('a'); const name = await page.evaluate( (element) => element.getAttribute('aria-label'), aElement ); const href = await page.evaluate( (element) => element.getAttribute('href'), aElement ); const imgElement = await divCard.$('img'); const imgLink = await page.evaluate( (element) => element.getAttribute('src'), imgElement ); const searchData = { name: name, url: `https://www.pinterest.com${href.replace('https://proxy.scrapeops.io', '')}`, image: imgLink, }; await writeToCsv([searchData], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, location, retries) { const browser = await puppeteer.launch(); await scrapeSearchResults(browser, keyword, location, retries); await browser.close();} async function processPin(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); await page.setExtraHTTPHeaders({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36', }); try { await page.goto(getScrapeOpsUrl(url, location), { timeout: 60000 }); const mainCard = await page.$("div[data-test-id='CloseupDetails']"); if (!mainCard) { throw new Error('Failed to load the page!'); } let website = 'n/a'; const websiteHolder = await page.$( "span[style='text-decoration: underline;']" ); if (websiteHolder) { website = await page.evaluate( (element) => element.textContent, websiteHolder ); } const starDivs = await page.$$("div[data-test-id='rating-star-full']"); const stars = starDivs.length; const profileInfoDiv = await mainCard.$( "div[data-test-id='follower-count']" ); if (profileInfoDiv === null) { throw new Error('Page failed to loaded, most likely blocked!'); } const profileText = await page.evaluate( (element) => element.textContent, profileInfoDiv ); const accountNameDiv = await profileInfoDiv.$( "div[data-test-id='creator-profile-name']" ); const nestedDiv = await accountNameDiv.$('div'); const accountName = await page.evaluate( (element) => element.getAttribute('title'), nestedDiv ); const followerCount = profileText .replace(accountName, '') .replace(' followers', ''); const pinData = { name: accountName, website: website, stars: stars, follower_count: followerCount, image: row.image, }; await writeToCsv([pinData], `${row.name.replace(' ', '-')}.csv`); success = true; } catch (err) { await page.screenshot({ path: 'ERROR.png' }); console.log(`Error: ${err}, tries left: ${retries - tries}, url: ${url}`); tries++; } finally { await page.close(); } }} async function processResults(csvFile, location, concurrencyLimit, retries) { const pins = await readCsv(csvFile); const browser = await puppeteer.launch(); while (pins.length > 0) { const currentBatch = pins.splice(0, concurrencyLimit); const tasks = currentBatch.map((pin) => processPin(browser, pin, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ['grilling']; const concurrencyLimit = 4; const location = 'us'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log('Crawl starting'); await startScrape(keyword, location, retries); console.log('Crawl complete'); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); } console.log('Starting scrape'); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log('Scrape complete');} main();
const
variables inside of main
if you'd like to tweak your results. Try changing the following:keywords
: This list contains the keywords for which you want to scrape Pinterest search results.concurrencyLimit
: This parameter sets the number of concurrent tasks (or browser pages) that the script will process at the same time.location
: This parameter sets the geographical location from which the requests are made. It can affect the content returned by the website due to region-specific restrictions or differences.retries
: This parameter sets the maximum number of attempts the script will make to fetch data from a URL if the initial request fails.country
and maybe lowering your concurrencyLimit
.https://www.pinterest.com/search/pins/?q=grilling&rs=typed
https://www.pinterest.com/search/pins/
.?
tells the server that we'd like to perform a query.&
.?q=grilling&rs=typed
.typed
is a standard query when we perform a Pinterest search on our computer.grilling
is the search we actually want to perform.https://www.pinterest.com/pin/PIN-NUMBER-GOES-HERE/
country
param which will actually route us through a server in that country.mkdir pinterest-scraper cd pinterest-scraper
npm init --y
npm install puppeteer
npm install csv-writer
npm install csv-parse
npm install fs
scrapeSearchResults()
is our parsing function.Take a look at the code below.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); await page.setJavaScriptEnabled(false); try { const url = `https://www.pinterest.com/search/pins/?q=${formattedKeyword}&rs=typed`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-grid-item='true']"); for (const divCard of divCards) { const aElement = await divCard.$('a'); const name = await page.evaluate( (element) => element.getAttribute('aria-label'), aElement ); const href = await page.evaluate( (element) => element.getAttribute('href'), aElement ); const imgElement = await divCard.$('img'); const imgLink = await page.evaluate( (element) => element.getAttribute('src'), imgElement ); const searchData = { name: name, url: `https://www.pinterest.com${href.replace('https://proxy.scrapeops.io', '')}`, image: imgLink, }; console.log(searchData); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, location, concurrencyLimit, retries) { const browser = await puppeteer.launch(); await scrapeSearchResults(browser, keyword, location, retries); await browser.close();} async function main() { const keywords = ['grilling']; const concurrencyLimit = 4; const location = 'uk'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log('Crawl starting'); await startScrape(keyword, location, retries); console.log('Crawl complete'); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); }} main();
await page.$$("div[data-grid-item='true']")
finds all the result items on the page. On Pinterest, data-grid-item='true']
denotes an individual search result.await divCard.$("a")
pulls the link or <a>
element from the search result.await page.evaluate(element => element.getAttribute("aria-label"), aElement)
.await page.evaluate(element => element.getAttribute("href"), aElement)
await page.evaluate(element => element.getAttribute("src"), imgElement)
async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }}
writeToCsv()
takes an array of JSON objects and a filename. First, it checks if our outputFile
exists. If it doesn't exist, we create it. If the file does exist, we append it.This approach allows us to always write the maximum possible data to a file without overwriting existing data.In our updated code below, we adjust it to write the object to a CSV file instead of printing it to the console.const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); await page.setJavaScriptEnabled(false); try { const url = `https://www.pinterest.com/search/pins/?q=${formattedKeyword}&rs=typed`; await page.goto(url); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-grid-item='true']"); for (const divCard of divCards) { const aElement = await divCard.$('a'); const name = await page.evaluate( (element) => element.getAttribute('aria-label'), aElement ); const href = await page.evaluate( (element) => element.getAttribute('href'), aElement ); const imgElement = await divCard.$('img'); const imgLink = await page.evaluate( (element) => element.getAttribute('src'), imgElement ); const searchData = { name: name, url: `https://www.pinterest.com${href.replace('https://proxy.scrapeops.io', '')}`, image: imgLink, }; await writeToCsv([searchData], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, location, concurrencyLimit, retries) { const browser = await puppeteer.launch(); await scrapeSearchResults(browser, keyword, location, retries); await browser.close();} async function main() { const keywords = ['grilling']; const concurrencyLimit = 4; const location = 'uk'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log('Crawl starting'); await startScrape(keyword, location, retries); console.log('Crawl complete'); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); }} main();
getScrapeOpsUrl()
function. While it's only a small amount of code, this function converts any regular URL into a ScrapeOps proxied URL.Another important point in our case today is the wait
parameter. If you remember from our earlier examples, we actually disable JavaScript from running inside Puppeteer. wait: 2000
tell the ScrapeOps server to wait two seconds for our content to render before sending the page back to us.We're then able to read the static page without getting blocked or redirected by the JavaScript code that Pinterest tries to execute.function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, wait: 2000, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}
const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, wait: 2000, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); await page.setJavaScriptEnabled(false); try { const url = `https://www.pinterest.com/search/pins/?q=${formattedKeyword}&rs=typed`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-grid-item='true']"); for (const divCard of divCards) { const aElement = await divCard.$('a'); const name = await page.evaluate( (element) => element.getAttribute('aria-label'), aElement ); const href = await page.evaluate( (element) => element.getAttribute('href'), aElement ); const imgElement = await divCard.$('img'); const imgLink = await page.evaluate( (element) => element.getAttribute('src'), imgElement ); const searchData = { name: name, url: `https://www.pinterest.com${href.replace('https://proxy.scrapeops.io', '')}`, image: imgLink, }; await writeToCsv([searchData], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, location, concurrencyLimit, retries) { const browser = await puppeteer.launch(); await scrapeSearchResults(browser, keyword, location, retries); await browser.close();} async function main() { const keywords = ['grilling']; const concurrencyLimit = 4; const location = 'uk'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log('Crawl starting'); await startScrape(keyword, location, retries); console.log('Crawl complete'); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); }} main();
main
below.async function main() { const keywords = ['grilling']; const concurrencyLimit = 4; const location = 'us'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log('Crawl starting'); await startScrape(keyword, location, retries); console.log('Crawl complete'); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); }}
keywords
location
retries
concurrencyLimit
yet because we're not using it just yet. This will come into play when we build our pin scraper.Here are our results:country
you passed into the API.async function processPin(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url, { timeout: 60000 }); const mainCard = await page.$("div[data-test-id='CloseupDetails']"); let website = 'n/a'; const websiteHolder = await page.$( "span[style='text-decoration: underline;']" ); if (websiteHolder) { website = await page.evaluate( (element) => element.textContent, websiteHolder ); } const starDivs = await page.$$("div[data-test-id='rating-star-full']"); const stars = starDivs.length; const profileInfoDiv = await mainCard.$( "div[data-test-id='follower-count']" ); if (profileInfoDiv === null) { throw new Error('Page failed to loaded, most likely blocked!'); } const profileText = await page.evaluate( (element) => element.textContent, profileInfoDiv ); const accountNameDiv = await profileInfoDiv.$( "div[data-test-id='creator-profile-name']" ); const nestedDiv = await accountNameDiv.$('div'); const accountName = await page.evaluate( (element) => element.getAttribute('title'), nestedDiv ); const followerCount = profileText .replace(accountName, '') .replace(' followers', ''); const pinData = { name: accountName, website: website, stars: stars, follower_count: followerCount, image: row.image, }; console.log(pinData); success = true; } catch (err) { await page.screenshot({ path: 'ERROR.png' }); console.log(`Error: ${err}, tries left: ${retries - tries}, url: ${url}`); tries++; } finally { await page.close(); } }}
processPin()
:await page.$("div[data-test-id='CloseupDetails']")
finds the main card on the page.websiteHolder
with await page.$("span[style='text-decoration: underline;']")
websiteHolder
present, we use await page.evaluate(element => element.textContent, websiteHolder)
to extract the textContent
from it.await mainCard.$("div[data-test-id='follower-count']")
looks for the profile section on the page. If this item isn't present, we throw an error because the page didn't load correctly.await page.evaluate(element => element.getAttribute("title"), nestedDiv)
pulls the account name from our nestedDiv
.replace()
to remove unneeded text and retrieve our follower count.processPin()
function isn't very useful if it doesn't know what to scrape. We need to read the CSV file created by our crawler and then pass all the rows from the crawler into processPin()
.The function below takes a CSV file and reads it into an array of JSON objects.async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;}
const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, wait: 2000, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); await page.setJavaScriptEnabled(false); try { const url = `https://www.pinterest.com/search/pins/?q=${formattedKeyword}&rs=typed`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-grid-item='true']"); for (const divCard of divCards) { const aElement = await divCard.$('a'); const name = await page.evaluate( (element) => element.getAttribute('aria-label'), aElement ); const href = await page.evaluate( (element) => element.getAttribute('href'), aElement ); const imgElement = await divCard.$('img'); const imgLink = await page.evaluate( (element) => element.getAttribute('src'), imgElement ); const searchData = { name: name, url: `https://www.pinterest.com${href.replace('https://proxy.scrapeops.io', '')}`, image: imgLink, }; await writeToCsv([searchData], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, location, concurrencyLimit, retries) { const browser = await puppeteer.launch(); await scrapeSearchResults(browser, keyword, location, retries); await browser.close();} async function processPin(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url, { timeout: 60000 }); const mainCard = await page.$("div[data-test-id='CloseupDetails']"); let website = 'n/a'; const websiteHolder = await page.$( "span[style='text-decoration: underline;']" ); if (websiteHolder) { website = await page.evaluate( (element) => element.textContent, websiteHolder ); } const starDivs = await page.$$("div[data-test-id='rating-star-full']"); const stars = starDivs.length; const profileInfoDiv = await mainCard.$( "div[data-test-id='follower-count']" ); if (profileInfoDiv === null) { throw new Error('Page failed to loaded, most likely blocked!'); } const profileText = await page.evaluate( (element) => element.textContent, profileInfoDiv ); const accountNameDiv = await profileInfoDiv.$( "div[data-test-id='creator-profile-name']" ); const nestedDiv = await accountNameDiv.$('div'); const accountName = await page.evaluate( (element) => element.getAttribute('title'), nestedDiv ); const followerCount = profileText .replace(accountName, '') .replace(' followers', ''); const pinData = { name: accountName, website: website, stars: stars, follower_count: followerCount, image: row.image, }; console.log(pinData); success = true; } catch (err) { await page.screenshot({ path: 'ERROR.png' }); console.log(`Error: ${err}, tries left: ${retries - tries}, url: ${url}`); tries++; } finally { await page.close(); } }} async function processResults(csvFile, location, concurrencyLimit, retries) { const pins = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const pin of pins) { await processPin(browser, pin, location, location, retries); } await browser.close();} async function main() { const keywords = ['grilling']; const concurrencyLimit = 4; const location = 'uk'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log('Crawl starting'); await startScrape(keyword, location, retries); console.log('Crawl complete'); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); } console.log('Starting scrape'); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log('Scrape complete');} main();
writeToCsv()
function from earlier, we just need to put it in the right place. Instead of logging each pin item to the console, we're going to do this.await writeToCsv([pinData], `${row.name.replace(' ', '-')}.csv`);
const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, wait: 2000, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); await page.setJavaScriptEnabled(false); try { const url = `https://www.pinterest.com/search/pins/?q=${formattedKeyword}&rs=typed`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-grid-item='true']"); for (const divCard of divCards) { const aElement = await divCard.$('a'); const name = await page.evaluate( (element) => element.getAttribute('aria-label'), aElement ); const href = await page.evaluate( (element) => element.getAttribute('href'), aElement ); const imgElement = await divCard.$('img'); const imgLink = await page.evaluate( (element) => element.getAttribute('src'), imgElement ); const searchData = { name: name, url: `https://www.pinterest.com${href.replace('https://proxy.scrapeops.io', '')}`, image: imgLink, }; await writeToCsv([searchData], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, location, concurrencyLimit, retries) { const browser = await puppeteer.launch(); await scrapeSearchResults(browser, keyword, location, retries); await browser.close();} async function processPin(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(url, { timeout: 60000 }); const mainCard = await page.$("div[data-test-id='CloseupDetails']"); let website = 'n/a'; const websiteHolder = await page.$( "span[style='text-decoration: underline;']" ); if (websiteHolder) { website = await page.evaluate( (element) => element.textContent, websiteHolder ); } const starDivs = await page.$$("div[data-test-id='rating-star-full']"); const stars = starDivs.length; const profileInfoDiv = await mainCard.$( "div[data-test-id='follower-count']" ); if (profileInfoDiv === null) { throw new Error('Page failed to load, most likely blocked!'); } const profileText = await page.evaluate( (element) => element.textContent, profileInfoDiv ); const accountNameDiv = await profileInfoDiv.$( "div[data-test-id='creator-profile-name']" ); const nestedDiv = await accountNameDiv.$('div'); const accountName = await page.evaluate( (element) => element.getAttribute('title'), nestedDiv ); const followerCount = profileText .replace(accountName, '') .replace(' followers', ''); const pinData = { name: accountName, website: website, stars: stars, follower_count: followerCount, image: row.image, }; await writeToCsv([pinData], `${row.name.replace(' ', '-')}.csv`); success = true; } catch (err) { await page.screenshot({ path: 'ERROR.png' }); console.log(`Error: ${err}, tries left: ${retries - tries}, url: ${url}`); tries++; } finally { await page.close(); } }} async function processResults(csvFile, location, concurrencyLimit, retries) { const pins = await readCsv(csvFile); const browser = await puppeteer.launch(); for (const pin of pins) { await processPin(browser, pin, location, location, retries); } await browser.close();} async function main() { const keywords = ['grilling']; const concurrencyLimit = 4; const location = 'uk'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log('Crawl starting'); await startScrape(keyword, location, retries); console.log('Crawl complete'); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); } console.log('Starting scrape'); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log('Scrape complete');} main();
processResults()
function to look like this.async function processResults(csvFile, location, concurrencyLimit, retries) { const pins = await readCsv(csvFile); const browser = await puppeteer.launch(); while (pins.length > 0) { const currentBatch = pins.splice(0, concurrencyLimit); const tasks = currentBatch.map((pin) => processPin(browser, pin, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();}
while
pins
is longer than 0, we're going to splice from index 0 up to our concurrencyLimit
. This shortens the array (therefore reducing its size in memory) and also runs processPin()
on each row we spliced from the array.await Promise.all(tasks)
resolves, we repeat this process, constantly shrinking the array and improving performance as time goes on.processPin()
function. We need to replace page.goto(url)
with the following line.For extra redundancy, in getScrapeOpsUrl()
, we'll be setting residential
to true. Adding the residential
argument reduces the likelihood that Pinterest will block the proxy.During extensive testing, the Pinterest server was able to detect and block the scraper a good portion of the time when not using residential
.Here is our updated proxy function.function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, wait: 3000, residential: true, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}
await page.goto(getScrapeOpsUrl(url, location), { timeout: 60000 });
const puppeteer = require('puppeteer');const createCsvWriter = require('csv-writer').createObjectCsvWriter;const csvParse = require('csv-parse');const fs = require('fs'); const API_KEY = JSON.parse(fs.readFileSync('config.json')).api_key; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error('No data to write!'); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map((key) => ({ id: key, title: key })); const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists, }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error('Failed to write to csv'); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe( csvParse.parse({ columns: true, delimiter: ',', trim: true, skip_empty_lines: true, }) ); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location = 'us') { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location, residential: true, wait: 3000, }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function scrapeSearchResults( browser, keyword, pageNumber, location = 'us', retries = 3) { let tries = 0; let success = false; while (tries <= retries && !success) { const formattedKeyword = keyword.replace(' ', '+'); const page = await browser.newPage(); await page.setJavaScriptEnabled(false); try { const url = `https://www.pinterest.com/search/pins/?q=${formattedKeyword}&rs=typed`; const proxyUrl = getScrapeOpsUrl(url, location); await page.goto(proxyUrl); console.log(`Successfully fetched: ${url}`); const divCards = await page.$$("div[data-grid-item='true']"); for (const divCard of divCards) { const aElement = await divCard.$('a'); const name = await page.evaluate( (element) => element.getAttribute('aria-label'), aElement ); const href = await page.evaluate( (element) => element.getAttribute('href'), aElement ); const imgElement = await divCard.$('img'); const imgLink = await page.evaluate( (element) => element.getAttribute('src'), imgElement ); const searchData = { name: name, url: `https://www.pinterest.com${href.replace('https://proxy.scrapeops.io', '')}`, image: imgLink, }; await writeToCsv([searchData], `${keyword.replace(' ', '-')}.csv`); } success = true; } catch (err) { console.log(`Error: ${err}, tries left ${retries - tries}`); tries++; } finally { await page.close(); } }} async function startScrape(keyword, location, concurrencyLimit, retries) { const browser = await puppeteer.launch(); await scrapeSearchResults(browser, keyword, location, retries); await browser.close();} async function processPin(browser, row, location, retries = 3) { const url = row.url; let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(getScrapeOpsUrl(url, location), { timeout: 60000 }); const mainCard = await page.$("div[data-test-id='CloseupDetails']"); let website = 'n/a'; const websiteHolder = await page.$( "span[style='text-decoration: underline;']" ); if (websiteHolder) { website = await page.evaluate( (element) => element.textContent, websiteHolder ); } const starDivs = await page.$$("div[data-test-id='rating-star-full']"); const stars = starDivs.length; const profileInfoDiv = await mainCard.$( "div[data-test-id='follower-count']" ); if (profileInfoDiv === null) { throw new Error('Page failed to loaded, most likely blocked!'); } const profileText = await page.evaluate( (element) => element.textContent, profileInfoDiv ); const accountNameDiv = await profileInfoDiv.$( "div[data-test-id='creator-profile-name']" ); const nestedDiv = await accountNameDiv.$('div'); const accountName = await page.evaluate( (element) => element.getAttribute('title'), nestedDiv ); const followerCount = profileText .replace(accountName, '') .replace(' followers', ''); const pinData = { name: accountName, website: website, stars: stars, follower_count: followerCount, image: row.image, }; await writeToCsv([pinData], `${row.name.replace(' ', '-')}.csv`); success = true; } catch (err) { await page.screenshot({ path: 'ERROR.png' }); console.log(`Error: ${err}, tries left: ${retries - tries}, url: ${url}`); tries++; } finally { await page.close(); } }} async function processResults(csvFile, location, concurrencyLimit, retries) { const pins = await readCsv(csvFile); const browser = await puppeteer.launch(); while (pins.length > 0) { const currentBatch = pins.splice(0, concurrencyLimit); const tasks = currentBatch.map((pin) => processPin(browser, pin, location, retries) ); try { await Promise.all(tasks); } catch (err) { console.log(`Failed to process batch: ${err}`); } } await browser.close();} async function main() { const keywords = ['grilling']; const concurrencyLimit = 4; const location = 'uk'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log('Crawl starting'); await startScrape(keyword, location, retries); console.log('Crawl complete'); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); } console.log('Starting scrape'); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log('Scrape complete');} main();
main
function that we'll be running.async function main() { const keywords = ['grilling']; const concurrencyLimit = 4; const location = 'us'; const retries = 3; const aggregateFiles = []; for (const keyword of keywords) { console.log('Crawl starting'); await startScrape(keyword, location, retries); console.log('Crawl complete'); aggregateFiles.push(`${keyword.replace(' ', '-')}.csv`); } console.log('Starting scrape'); for (const file of aggregateFiles) { await processResults(file, location, concurrencyLimit, retries); } console.log('Scrape complete');}
Terms of Service
and robots.txt.
. Pinterest's terms are available here.If you violate these terms, you can even lose your Pinterest account! Their robots.txt
is available here.Also, keep in mind whether the data you're scraping is public. Private data (data behind a login), can often be illegal to scrape. Generally, public data (data not behind a login) is public information and therefore fair game when scraping.If you are unsure of the legality of a your scraper, it is best to consult an attorney based in your jurisdiction.