Then check out ScrapeOps, the complete toolkit for web scraping.
.json
to the end of our url. In the example below, we've got a production ready Reddit scraper:import requestsfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" author: str = "" permalink: str = "" upvote_ratio: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CommentData: name: str = "" body: str = "" upvotes: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url #get posts from a subredditdef get_posts(feed, limit=100, retries=3, data_pipline=None): tries = 0 success = False while tries <= retries and not success: try: url = f"https://www.reddit.com/r/{feed}.json?limit={limit}" proxy_url = get_scrapeops_url(url) resp = requests.get(proxy_url, headers=headers) if resp.status_code == 200: success = True children = resp.json()["data"]["children"] for child in children: data = child["data"] #extract individual fields from the site data article_data = SearchData( name=data["title"], author=data["author_fullname"], permalink=data["permalink"], upvote_ratio=data["upvote_ratio"] ) data_pipline.add_data(article_data) else: logger.warning(f"Failed response: {resp.status_code}") raise Exception("Failed to get posts") except Exception as e: logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 #process an individual postdef process_post(post_object, location="us", retries=3): tries = 0 success = False permalink = post_object["permalink"] r_url = f"https://www.reddit.com{permalink}.json" link_array = permalink.split("/") filename = link_array[-2].replace(" ", "-") comment_pipeline = DataPipeline(csv_filename=f"{filename}.csv") while tries <= retries and not success: try: comment_data = requests.get(get_scrapeops_url(r_url, location=location), headers=headers) if comment_data.status_code != 200: raise Exception(f"Failed response: {comment_data.status_code}") comments = comment_data.json() comments_list = comments[1]["data"]["children"] for comment in comments_list: if comment["kind"] != "more": data = comment["data"] comment_data = CommentData( name=data["author"], body=data["body"], upvotes=data["ups"] ) comment_pipeline.add_data(comment_data) comment_pipeline.close_pipeline() success = True except Exception as e: logger.warning(f"Failed to retrieve comment:\n{e}") tries += 1 if not success: raise Exception(f"Max retries exceeded {retries}") #process a batch of postsdef process_posts(csv_file, max_workers=5, location="us", retries=3): with open(csv_file, newline="") as csvfile: reader = list(csv.DictReader(csvfile)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map(process_post, reader, [location] * len(reader), [retries] * len(reader)) ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] LOCATION = "us" BATCH_SIZE = 100 MAX_THREADS = 5 AGGREGATED_FEEDS = [] for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipline=feed_pipeline) feed_pipeline.close_pipeline() AGGREGATED_FEEDS.append(f"{feed_filename}.csv") for individual_file in AGGREGATED_FEEDS: process_posts(individual_file, location=LOCATION, max_workers=MAX_THREADS)
FEEDS
array.MAX_THREADS
constant.BATCH_SIZE
to 100.https://www.reddit.com/r/news/?rdt=51809
https://www.reddit.com/r/news.json
/?rdt=51809
to .json
, we've turn Reddit into a full blown feed!{"name": "John Doe", "age": 30}
{name: "John Doe", age: 30}
dict
in Python we simply use its keys. Our entire list of content comes in our resp.json()
. To access each article in the list, all we have to do is change our index:resp.json()["data"]["children"][0]
resp.json()["data"]["children"][1]
resp.json()["data"]["children"][2]
https://www.reddit.com/r/news.json
limit
parameter to this url for finer control of our results. If we want 100 news results, our url would look like this:https://www.reddit.com/r/news.json?limit=100
reddit-scraper
.mkdir reddit-scraper
python -m venv venv
source venv/bin/activate
pip install requests
get_posts()
. This function takes one argument, feed
and a kwarg, retries
which is set to 3 by default.try
to fetch the json feed from Reddit.raise
an Exception
and allow the crawler to crash.children
.data
field. From that data
field, we pull the title
, author_fullname
, permalink
, and upvote_ratio
.import requestsfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) #get posts from a subredditdef get_posts(feed, retries=3): tries = 0 success = False while tries <= retries and not success: try: url = f"https://www.reddit.com/r/{feed}.json" resp = requests.get(url, headers=headers) if resp.status_code == 200: success = True children = resp.json()["data"]["children"] for child in children: data = child["data"] #extract individual fields from the site data name = data["title"] author = data["author_fullname"] permalink = data["permalink"] upvote_ratio = data["upvote_ratio"] #print the extracted data print(f"Name: {name}") print(f"Author: {author}") print(f"Permalink: {permalink}") print(f"Upvote Ratio: {upvote_ratio}") else: logger.warning(f"Failed response: {resp.status_code}") raise Exception("Failed to get posts") except Exception as e: logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] for feed in FEEDS: get_posts(feed)
limit
parameter to our url.Let's refactor our get_posts()
function to take an additional keyword, limit
. Taking our limit into account, our url will now look like this:https://www.reddit.com/r/{feed}.json?limit={limit}
import requestsfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) #get posts from a subredditdef get_posts(feed, limit=100, retries=3): tries = 0 success = False while tries <= retries and not success: try: url = f"https://www.reddit.com/r/{feed}.json?limit={limit}" resp = requests.get(url, headers=headers) if resp.status_code == 200: success = True children = resp.json()["data"]["children"] for child in children: data = child["data"] #extract individual fields from the site data name = data["title"] author = data["author_fullname"] permalink = data["permalink"] upvote_ratio = data["upvote_ratio"] #print the extracted data print(f"Name: {name}") print(f"Author: {author}") print(f"Permalink: {permalink}") print(f"Upvote Ratio: {upvote_ratio}") else: logger.warning(f"Failed response: {resp.status_code}") raise Exception("Failed to get posts") except Exception as e: logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] BATCH_SIZE = 10 for feed in FEEDS: get_posts(feed, limit=BATCH_SIZE)
limit
parameter to get_posts()
. We also declare a new constant in our main, BATCH_SIZE
. We pass our BATCH_SIZE
into get_posts()
to control the size of our results. Feel free to try changing the batch size and examining your results. limit
is incredibly important.We don't want to scrape through hundreds of results if we only need 10... and we certainly don't want to try scraping hundreds of results when we're only limited to 10!SearchData
class and a DataPipeline
class as well. SearchData
is going to be relatively simple, all it's going to do is hold individual data.DataPipeline
will be doing the real heavy lifting. Our DataPipeline
class will be doing all the work of removing duplicates and saving our SearchData
objects to CSV.In this example, we utilize SearchData
to hold the data we've extracted and we then pass it into the DataPipeline
.import requestsfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" author: str = "" permalink: str = "" upvote_ratio: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() #get posts from a subredditdef get_posts(feed, limit=100, retries=3, data_pipline=None): tries = 0 success = False while tries <= retries and not success: try: url = f"https://www.reddit.com/r/{feed}.json?limit={limit}" resp = requests.get(url, headers=headers) if resp.status_code == 200: success = True children = resp.json()["data"]["children"] for child in children: data = child["data"] #extract individual fields from the site data article_data = SearchData( name=data["title"], author=data["author_fullname"], permalink=data["permalink"], upvote_ratio=data["upvote_ratio"] ) data_pipline.add_data(article_data) else: logger.warning(f"Failed response: {resp.status_code}") raise Exception("Failed to get posts") except Exception as e: logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] BATCH_SIZE = 10 for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipline=feed_pipeline) feed_pipeline.close_pipeline()
DataPipeline
and pass it into get_posts()
. From inside get_posts()
, we get our post data and turn it into a SearchData
object.This object then gets passed into the DataPipeline
which removes duplicates and saves everything to a CSV. Once we've gone through and processed the posts, we go through and close the pipeline.get_scrapeops_url()
. This function takes in a regular url and uses simple string formatting to create a proxied url.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import requestsfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" author: str = "" permalink: str = "" upvote_ratio: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url #get posts from a subredditdef get_posts(feed, limit=100, retries=3, data_pipline=None): tries = 0 success = False while tries <= retries and not success: try: url = f"https://www.reddit.com/r/{feed}.json?limit={limit}" proxy_url = get_scrapeops_url(url) resp = requests.get(proxy_url, headers=headers) if resp.status_code == 200: success = True children = resp.json()["data"]["children"] for child in children: data = child["data"] #extract individual fields from the site data article_data = SearchData( name=data["title"], author=data["author_fullname"], permalink=data["permalink"], upvote_ratio=data["upvote_ratio"] ) data_pipline.add_data(article_data) else: logger.warning(f"Failed response: {resp.status_code}") raise Exception("Failed to get posts") except Exception as e: logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] BATCH_SIZE = 10 for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipline=feed_pipeline) feed_pipeline.close_pipeline()
proxy_url
by passing our url
into get_scrapeops_url()
. We then send our request to the proxy_url
instead of our url
. Everything else in our code remains the same.########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] BATCH_SIZE = 100 for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipline=feed_pipeline) feed_pipeline.close_pipeline()
permalink
from post objects we created earlier in our crawler.We're not ready to run this code yet, we need to be able to read the CSV file we created earlier.#process an individual postdef process_post(post_object, retries=3, max_workers=5, location="us"): tries = 0 success = False permalink = post_object["permalink"] r_url = f"https://www.reddit.com{permalink}.json" link_array = permalink.split("/") filename = link_array[-2].replace(" ", "-") comment_data = requests.get(r_url) comments = comment_data.json() if not isinstance(comments, list): return None comments_list = comments[1]["data"]["children"] while tries <= retries and not success: try: for comment in comments_list: if comment["kind"] != "more": data = comment["data"] comment_data = { "name": data["author"], "body": data["body"], "upvotes": data["ups"] } print(f"Comment: {comment_data}") success = True except Exception as e: logger.warning(f"Failed to retrieve comment:\n{e}") tries += 1 if not success: raise Exception(f"Max retries exceeded {retries}")
isinstance()
to enforce type checking here and only parse comment lists. If our response is not a list, we return None
and exit the function.csv.DictReader()
, which allows us to read individual rows from the CSV file. We'll call process_post()
on each row we read from the file.Here is the full code example that reads rows from the CSV file and processes them. We have an additional function, process_posts()
. It uses a for
loop as just a placeholder for now, but later on, this function will be rewritten to use multithreading.import requestsfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" author: str = "" permalink: str = "" upvote_ratio: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url #get posts from a subredditdef get_posts(feed, limit=100, retries=3, data_pipline=None): tries = 0 success = False while tries <= retries and not success: try: url = f"https://www.reddit.com/r/{feed}.json?limit={limit}" proxy_url = get_scrapeops_url(url) resp = requests.get(proxy_url, headers=headers) if resp.status_code == 200: success = True children = resp.json()["data"]["children"] for child in children: data = child["data"] #extract individual fields from the site data article_data = SearchData( name=data["title"], author=data["author_fullname"], permalink=data["permalink"], upvote_ratio=data["upvote_ratio"] ) data_pipline.add_data(article_data) else: logger.warning(f"Failed response: {resp.status_code}") raise Exception("Failed to get posts") except Exception as e: logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 #process an individual postdef process_post(post_object, retries=3, max_workers=5, location="us"): tries = 0 success = False permalink = post_object["permalink"] r_url = f"https://www.reddit.com{permalink}.json" link_array = permalink.split("/") filename = link_array[-2].replace(" ", "-") comment_data = requests.get(r_url) comments = comment_data.json() if not isinstance(comments, list): return None comments_list = comments[1]["data"]["children"] while tries <= retries and not success: try: for comment in comments_list: if comment["kind"] != "more": data = comment["data"] comment_data = { "name": data["author"], "body": data["body"], "upvotes": data["ups"] } print(f"Comment: {comment_data}") success = True except Exception as e: logger.warning(f"Failed to retrieve comment:\n{e}") tries += 1 if not success: raise Exception(f"Max retries exceeded {retries}") #process a batch of postsdef process_posts(csv_file, max_workers=5, location="us"): with open(csv_file, newline="") as csvfile: reader = list(csv.DictReader(csvfile)) for row in reader: process_post(row) ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] BATCH_SIZE = 10 AGGREGATED_FEEDS = [] for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipline=feed_pipeline) feed_pipeline.close_pipeline() AGGREGATED_FEEDS.append(f"{feed_filename}.csv") for individual_file in AGGREGATED_FEEDS: process_posts(individual_file)
process_posts()
to read all the data from a Subreddit CSV. This function runs process_post()
on each individual post so we can extract important comment data from the post.CommentData
. Similar to SearchData
, the purpose of this class is to simply hold the data that we want to scrape.@dataclassclass CommentData: name: str = "" body: str = "" upvotes: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
process_post()
rewritten to store our data.def process_post(post_object, retries=3, location="us"): tries = 0 success = False permalink = post_object["permalink"] r_url = f"https://www.reddit.com{permalink}.json" link_array = permalink.split("/") filename = link_array[-2].replace(" ", "-") comment_pipeline = DataPipeline(csv_filename=f"{filename}.csv") while tries <= retries and not success: try: comment_data = requests.get(r_url, headers=headers) if comment_data.status_code != 200: raise Exception(f"Failed response: {comment_data.status_code}") comments = comment_data.json() comments_list = comments[1]["data"]["children"] for comment in comments_list: if comment["kind"] != "more": data = comment["data"] comment_data = CommentData( name=data["author"], body=data["body"], upvotes=data["ups"] ) comment_pipeline.add_data(comment_data) comment_pipeline.close_pipeline() success = True except Exception as e: logger.warning(f"Failed to retrieve comment:\n{e}") tries += 1 if not success: raise Exception(f"Max retries exceeded {retries}")
DataPipeline
of its own. Each post gets its own DataPipeline
so we can store the comment data efficiently.If you add this code to your scraper now, it might run for a little while, but it will almost definitely crash with a status code 429. This is because our scraper is much faster than a normal human being and the Reddit server does not prioritize our responses. 429 simply means that the server doesn't have the resources to handle all of our requests.ThreadPoolExecutor
. This allows us to open a new pool of threads with however many max_workers
we want to specifiy.This is actually going to increase our likelihood of getting blocked, so adding proxy support in the next section is super important!Here is our new process_posts()
:#process a batch of postsdef process_posts(csv_file, max_workers=5, location="us", retries=3): with open(csv_file, newline="") as csvfile: reader = list(csv.DictReader(csvfile)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map(process_post, reader, [location] * len(reader), [retries] * len(reader))
executor.map()
takes process_post()
as its first argument and then it passes all of our other arguments into it as lists. This opens up a new thread to process each post and save its data to its own individual CSV file.For instance if we have an article in the CSV generated earlier called "Headline Here". We'll now have a separate CSV file specifically for comments and metadata from the "Headline Here" article, and we'll have it fast.We now can update process_post()
to look like this:def process_post(post_object, location="us", retries=3): tries = 0 success = False permalink = post_object["permalink"] r_url = f"https://www.reddit.com{permalink}.json" link_array = permalink.split("/") filename = link_array[-2].replace(" ", "-") comment_pipeline = DataPipeline(csv_filename=f"{filename}.csv") while tries <= retries and not success: try: comment_data = requests.get(r_url, headers=headers) if comment_data.status_code != 200: raise Exception(f"Failed response: {comment_data.status_code}") comments = comment_data.json() comments_list = comments[1]["data"]["children"] for comment in comments_list: if comment["kind"] != "more": data = comment["data"] comment_data = CommentData( name=data["author"], body=data["body"], upvotes=data["ups"] ) comment_pipeline.add_data(comment_data) comment_pipeline.close_pipeline() success = True except Exception as e: logger.warning(f"Failed to retrieve comment:\n{e}") tries += 1 if not success: raise Exception(f"Max retries exceeded {retries}")
process_post()
, we only need to change one line: comment_data = requests.get(get_scrapeops_url(r_url, location=location), headers=headers)
.Here is our final python script that makes full use of both the crawler and scraper.import requestsfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRE-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" author: str = "" permalink: str = "" upvote_ratio: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CommentData: name: str = "" body: str = "" upvotes: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url #get posts from a subredditdef get_posts(feed, limit=100, retries=3, data_pipline=None): tries = 0 success = False while tries <= retries and not success: try: url = f"https://www.reddit.com/r/{feed}.json?limit={limit}" proxy_url = get_scrapeops_url(url) resp = requests.get(proxy_url, headers=headers) if resp.status_code == 200: success = True children = resp.json()["data"]["children"] for child in children: data = child["data"] #extract individual fields from the site data article_data = SearchData( name=data["title"], author=data["author_fullname"], permalink=data["permalink"], upvote_ratio=data["upvote_ratio"] ) data_pipline.add_data(article_data) else: logger.warning(f"Failed response: {resp.status_code}") raise Exception("Failed to get posts") except Exception as e: logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 #process an individual postdef process_post(post_object, location="us", retries=3): tries = 0 success = False permalink = post_object["permalink"] r_url = f"https://www.reddit.com{permalink}.json" link_array = permalink.split("/") filename = link_array[-2].replace(" ", "-") comment_pipeline = DataPipeline(csv_filename=f"{filename}.csv") while tries <= retries and not success: try: comment_data = requests.get(get_scrapeops_url(r_url, location=location), headers=headers) if comment_data.status_code != 200: raise Exception(f"Failed response: {comment_data.status_code}") comments = comment_data.json() comments_list = comments[1]["data"]["children"] for comment in comments_list: if comment["kind"] != "more": data = comment["data"] comment_data = CommentData( name=data["author"], body=data["body"], upvotes=data["ups"] ) comment_pipeline.add_data(comment_data) comment_pipeline.close_pipeline() success = True except Exception as e: logger.warning(f"Failed to retrieve comment:\n{e}") tries += 1 if not success: raise Exception(f"Max retries exceeded {retries}") #process a batch of postsdef process_posts(csv_file, max_workers=5, location="us", retries=3): with open(csv_file, newline="") as csvfile: reader = list(csv.DictReader(csvfile)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map(process_post, reader, [location] * len(reader), [retries] * len(reader)) ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] LOCATION = "us" BATCH_SIZE = 100 MAX_THREADS = 5 AGGREGATED_FEEDS = [] for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipline=feed_pipeline) feed_pipeline.close_pipeline() AGGREGATED_FEEDS.append(f"{feed_filename}.csv") for individual_file in AGGREGATED_FEEDS: process_posts(individual_file, location=LOCATION, max_workers=MAX_THREADS)
if __name__ == "__main__": FEEDS = ["news"] LOCATION = "us" BATCH_SIZE = 100 MAX_THREADS = 5 AGGREGATED_FEEDS = [] for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipline=feed_pipeline) feed_pipeline.close_pipeline() AGGREGATED_FEEDS.append(f"{feed_filename}.csv") for individual_file in AGGREGATED_FEEDS: process_posts(individual_file, location=LOCATION, max_workers=MAX_THREADS)
MAX_THREADS
to 4. If you want a BATCH_SIZE
of 10, change it to 10. If you'd like to scrape a different Subreddit, just add it to the FEEDS
list.In the production run, we generated 100 CSV files all full of processed comments and metadata. It took 4 minutes and 40 seconds to generate all of the reports.Then check out ScrapeOps, the complete toolkit for web scraping.
.json
to the end of our url.from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'options = ChromeOptions()options.add_argument("--headless")options.add_argument(f"user-agent={user_agent}") proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" author: str = "" permalink: str = "" upvote_ratio: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CommentData: name: str = "" body: str = "" upvotes: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url #get posts from a subredditdef get_posts(feed, limit=100, retries=3, data_pipeline=None, location="us"): tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: url = f"https://www.reddit.com/r/{feed}.json?limit={limit}" driver.get(get_scrapeops_url(url, location=location)) json_text = driver.find_element(By.TAG_NAME, "pre").text resp = json.loads(json_text) if resp: success = True children = resp["data"]["children"] for child in children: data = child["data"] article_data = SearchData( name=data["title"], author=data["author"], permalink=data["permalink"], upvote_ratio=data["upvote_ratio"] ) data_pipeline.add_data(article_data) else: logger.warning(f"Failed response from server, tries left: {retries-tries}") raise Exception("Failed to get posts") except Exception as e: driver.save_screenshot(f"error-{tries}.png") logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 finally: driver.quit() def process_post(post_object, location="us", retries=3): tries = 0 success = False permalink = post_object["permalink"] r_url = f"https://www.reddit.com{permalink}.json" link_array = permalink.split("/") filename = link_array[-2].replace(" ", "-") comment_pipeline = DataPipeline(csv_filename=f"{filename}.csv") while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(get_scrapeops_url(r_url, location=location)) comment_data = driver.find_element(By.TAG_NAME, "pre").text if not comment_data: raise Exception(f"Failed response: {comment_data.status_code}") comments = json.loads(comment_data) comments_list = comments[1]["data"]["children"] for comment in comments_list: if comment["kind"] != "more": data = comment["data"] comment_data = CommentData( name=data["author"], body=data["body"], upvotes=data["ups"] ) comment_pipeline.add_data(comment_data) comment_pipeline.close_pipeline() success = True except Exception as e: logger.warning(f"Failed to retrieve comment:\n{e}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded {retries}") #process a batch of postsdef process_posts(csv_file, max_workers=5, location="us", retries=3): with open(csv_file, newline="") as csvfile: reader = list(csv.DictReader(csvfile)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map(process_post, reader, [location] * len(reader), [retries] * len(reader)) ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] BATCH_SIZE = 100 MAX_THREADS = 11 AGGREGATED_FEEDS = [] for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipeline=feed_pipeline) feed_pipeline.close_pipeline() AGGREGATED_FEEDS.append(f"{feed_filename}.csv") for individual_file in AGGREGATED_FEEDS: process_posts(individual_file, max_workers=MAX_THREADS)
FEEDS
array.MAX_THREADS
constant (it is set to run with 11 by default) If your CPU takes advantage of hyperthreading, you can actually use more threads than you have cores.BATCH_SIZE
to 100.https://www.reddit.com/r/news/?rdt=51809
https://www.reddit.com/r/news.json
/?rdt=51809
to .json
, we've turn Reddit into a full blown feed!{"name": "John Doe", "age": 30}
{name: "John Doe", age: 30}
dict
in Python we simply use its keys. Our entire list of content comes in our resp
or response object. With a standard HTTP client like Python Requests, we would simply use resp.json()
.With Selenium, we get the page and then we use json.loads()
to convert the text string into a dict
. To access each article in the list, all we have to do is change our index:resp["data"]["children"][0]
resp["data"]["children"][1]
resp["data"]["children"][2]
https://www.reddit.com/r/news.json
limit
parameter to this url for finer control of our results.If we want 100 news results, our url would look like this:https://www.reddit.com/r/news.json?limit=100
json.loads()
.reddit-scraper
.mkdir reddit-scraper
python -m venv venv
source venv/bin/activate
pip install selenium
get_posts()
. This function takes one argument, feed
and a kwarg, retries
which is set to 3 by default.try
to fetch the json feed from Reddit. If we run out of retries, we raise
an Exception
and allow the crawler to crash.pre
tag, so we simply use Selenium to find the pre
tag before loading our text.children
. Each item in this array represents an individual Reddit post.data
field. From that data
field, we pull the title
, author_fullname
, permalink
, and upvote_ratio
rom selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'options = ChromeOptions()options.add_argument("--headless")options.add_argument(f"user-agent={user_agent}") proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) #get posts from a subredditdef get_posts(feed, retries=3): tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: url = f"https://www.reddit.com/r/{feed}.json" driver.get(url) json_text = driver.find_element(By.TAG_NAME, "pre").text resp = json.loads(json_text) if resp: success = True children = resp["data"]["children"] for child in children: data = child["data"] #extract individual fields from the site data name = data["title"] author = data["author_fullname"] permalink = data["permalink"] upvote_ratio = data["upvote_ratio"] #print the extracted data print(f"Name: {name}") print(f"Author: {author}") print(f"Permalink: {permalink}") print(f"Upvote Ratio: {upvote_ratio}") else: logger.warning(f"Failed response: {resp.status_code}") raise Exception("Failed to get posts") except Exception as e: driver.save_screenshot(f"error-{tries}.png") logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 finally: driver.quit() ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] for feed in FEEDS: get_posts(feed)
limit
parameter to our url. Let's refactor our get_posts()
function to take an additional keyword, limit
.Taking our limit into account, our url will now look like this:https://www.reddit.com/r/{feed}.json?limit={limit}
from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'options = ChromeOptions()options.add_argument("--headless")options.add_argument(f"user-agent={user_agent}") proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) #get posts from a subredditdef get_posts(feed, limit=100, retries=3): tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: url = f"https://www.reddit.com/r/{feed}.json?limit={limit}" driver.get(url) json_text = driver.find_element(By.TAG_NAME, "pre").text resp = json.loads(json_text) if resp: success = True children = resp["data"]["children"] for child in children: data = child["data"] #extract individual fields from the site data name = data["title"] author = data["author_fullname"] permalink = data["permalink"] upvote_ratio = data["upvote_ratio"] #print the extracted data print(f"Name: {name}") print(f"Author: {author}") print(f"Permalink: {permalink}") print(f"Upvote Ratio: {upvote_ratio}") else: logger.warning(f"Failed response: {resp.status_code}") raise Exception("Failed to get posts") except Exception as e: driver.save_screenshot(f"error-{tries}.png") logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 finally: driver.quit() ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] BATCH_SIZE = 2 for feed in FEEDS: get_posts(feed, limit=BATCH_SIZE)
limit
parameter to get_posts()
.BATCH_SIZE
. We pass our BATCH_SIZE
into get_posts()
to control the size of our results.limit
is incredibly important. We don't want to scrape through hundreds of results if we only need 10... and we certainly don't want to try scraping hundreds of results when we're only limited to 10!SearchData
class and a DataPipeline
class as well.SearchData
is going to be relatively simple, all it's going to do is hold individual data. DataPipeline
will be doing the real heavy lifting.Our DataPipeline
class will be doing all the work of removing duplicates and saving our SearchData
objects to CSV.In this example, we utilize SearchData
to hold the data we've extracted and we then pass it into the DataPipeline
.from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'options = ChromeOptions()options.add_argument("--headless")options.add_argument(f"user-agent={user_agent}") proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" author: str = "" permalink: str = "" upvote_ratio: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() #get posts from a subredditdef get_posts(feed, limit=100, retries=3, data_pipeline=None): tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: url = f"https://www.reddit.com/r/{feed}.json?limit={limit}" driver.get(url) json_text = driver.find_element(By.TAG_NAME, "pre").text resp = json.loads(json_text) if resp: success = True children = resp["data"]["children"] for child in children: data = child["data"] article_data = SearchData( name=data["title"], author=data["author"], permalink=data["permalink"], upvote_ratio=data["upvote_ratio"] ) data_pipeline.add_data(article_data) else: logger.warning(f"Failed response from server, tries left: {retries-tries}") raise Exception("Failed to get posts") except Exception as e: driver.save_screenshot(f"error-{tries}.png") logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 finally: driver.quit() ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] BATCH_SIZE = 2 for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipeline=feed_pipeline) feed_pipeline.close_pipeline()
DataPipeline
and pass it into get_posts()
.get_posts()
, we get our post data and turn it into a SearchData
object.DataPipeline
which removes duplicates and saves everything to a CSV.author_fullname
with author
. This allows us to see the actual display name of each poster.get_scrapeops_url()
. This function takes in a regular url and uses simple string formatting to create a proxied url.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'options = ChromeOptions()options.add_argument("--headless")options.add_argument(f"user-agent={user_agent}") proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" author: str = "" permalink: str = "" upvote_ratio: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url #get posts from a subredditdef get_posts(feed, limit=100, retries=3, data_pipeline=None, location="us"): tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: url = f"https://www.reddit.com/r/{feed}.json?limit={limit}" driver.get(get_scrapeops_url(url, location=location)) json_text = driver.find_element(By.TAG_NAME, "pre").text resp = json.loads(json_text) if resp: success = True children = resp["data"]["children"] for child in children: data = child["data"] article_data = SearchData( name=data["title"], author=data["author"], permalink=data["permalink"], upvote_ratio=data["upvote_ratio"] ) data_pipeline.add_data(article_data) else: logger.warning(f"Failed response from server, tries left: {retries-tries}") raise Exception("Failed to get posts") except Exception as e: driver.save_screenshot(f"error-{tries}.png") logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 finally: driver.quit() ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] BATCH_SIZE = 2 for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipeline=feed_pipeline) feed_pipeline.close_pipeline()
proxy_url
by passing our url
into get_scrapeops_url()
. We then pass the result directly into driver.get()
so Selenium will navigate to the proxied url instead of the regular one.########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] BATCH_SIZE = 100 for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipline=feed_pipeline) feed_pipeline.close_pipeline()
permalink
from post objects we created earlier in our crawler.We're not ready to run this code yet, we need to be able to read the CSV file we created earlier. If we can't read the file, our scraper won't know which posts to process.def process_post(post_object, location="us", retries=3): tries = 0 success = False permalink = post_object["permalink"] r_url = f"https://www.reddit.com{permalink}.json" link_array = permalink.split("/") filename = link_array[-2].replace(" ", "-") while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(r_url) comment_data = driver.find_element(By.TAG_NAME, "pre").text if not comment_data: raise Exception(f"Failed response: {comment_data.status_code}") comments = json.loads(comment_data) comments_list = comments[1]["data"]["children"] for comment in comments_list: if comment["kind"] != "more": data = comment["data"] comment_data = { "name": data["author"], "body": data["body"], "upvotes": data["ups"] } print(f"Comment: {comment_data}") success = True except Exception as e: logger.warning(f"Failed to retrieve comment:\n{e}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded {retries}")
comment["kind"]
is not "more"
, we assume that these are comments we want to process.We pull the author
, body
, and upvotes
for each individual comment. If someone wants to look at this data in a large scope, they can then compare accurately to see which types of comments get the best reactions from people.csv.DictReader()
, which allows us to read individual rows from the CSV file. We'll call process_post()
on each row we read from the file.Here is the full code example that reads rows from the CSV file and processes them. We have an additional function, process_posts()
. It uses a for
loop as just a placeholder for now, but later on, this function will be rewritten to use multithreading.from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'options = ChromeOptions()options.add_argument("--headless")options.add_argument(f"user-agent={user_agent}") proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" author: str = "" permalink: str = "" upvote_ratio: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url #get posts from a subredditdef get_posts(feed, limit=100, retries=3, data_pipeline=None, location="us"): tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: url = f"https://www.reddit.com/r/{feed}.json?limit={limit}" driver.get(get_scrapeops_url(url, location=location)) json_text = driver.find_element(By.TAG_NAME, "pre").text resp = json.loads(json_text) if resp: success = True children = resp["data"]["children"] for child in children: data = child["data"] article_data = SearchData( name=data["title"], author=data["author"], permalink=data["permalink"], upvote_ratio=data["upvote_ratio"] ) data_pipeline.add_data(article_data) else: logger.warning(f"Failed response from server, tries left: {retries-tries}") raise Exception("Failed to get posts") except Exception as e: driver.save_screenshot(f"error-{tries}.png") logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 finally: driver.quit() def process_post(post_object, location="us", retries=3): tries = 0 success = False permalink = post_object["permalink"] r_url = f"https://www.reddit.com{permalink}.json" link_array = permalink.split("/") filename = link_array[-2].replace(" ", "-") while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(r_url) comment_data = driver.find_element(By.TAG_NAME, "pre").text if not comment_data: raise Exception(f"Failed response: {comment_data.status_code}") comments = json.loads(comment_data) comments_list = comments[1]["data"]["children"] for comment in comments_list: if comment["kind"] != "more": data = comment["data"] comment_data = { "name": data["author"], "body": data["body"], "upvotes": data["ups"] } print(f"Comment: {comment_data}") success = True except Exception as e: logger.warning(f"Failed to retrieve comment:\n{e}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded {retries}") #process a batch of postsdef process_posts(csv_file, max_workers=5, location="us"): with open(csv_file, newline="") as csvfile: reader = list(csv.DictReader(csvfile)) for row in reader: process_post(row) ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] BATCH_SIZE = 10 AGGREGATED_FEEDS = [] for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipeline=feed_pipeline) feed_pipeline.close_pipeline() AGGREGATED_FEEDS.append(f"{feed_filename}.csv") for individual_file in AGGREGATED_FEEDS: process_posts(individual_file)
process_posts()
to read all the data from a Subreddit CSV. This function runs process_post()
on each individual post so we can extract important comment data from the post.CommentData
. Similar to SearchData
, the purpose of this class is to simply hold the data that we want to scrape.Once we've got CommentData
, we pass it straight into a DataPipeline
.@dataclassclass CommentData: name: str = "" body: str = "" upvotes: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
process_post()
rewritten to store our data.def process_post(post_object, location="us", retries=3): tries = 0 success = False permalink = post_object["permalink"] r_url = f"https://www.reddit.com{permalink}.json" link_array = permalink.split("/") filename = link_array[-2].replace(" ", "-") comment_pipeline = DataPipeline(csv_filename=f"{filename}.csv") while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(r_url) comment_data = driver.find_element(By.TAG_NAME, "pre").text if not comment_data: raise Exception(f"Failed response: {comment_data.status_code}") comments = json.loads(comment_data) comments_list = comments[1]["data"]["children"] for comment in comments_list: if comment["kind"] != "more": data = comment["data"] comment_data = CommentData( name=data["author"], body=data["body"], upvotes=data["ups"] ) comment_pipeline.add_data(comment_data) comment_pipeline.close_pipeline() success = True except Exception as e: logger.warning(f"Failed to retrieve comment:\n{e}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded {retries}")
DataPipeline
of its own. Each post gets its own DataPipeline
so we can safely store the comment data efficiently. This code might get you blocked.Selenium is faster than any human could possibly be and Reddit will notice abonormalities. After we add concurrency, we're going to add proxy support into our scraper as well.ThreadPoolExecutor
. This allows us to open a new pool of threads with however many max_workers
we want to specifiy.This is actually going to increase our likelihood of getting blocked, so adding proxy support in the next section is super important! The reason it increases our likelihood of getting blocked is simple. Our scraper was already really fast. Now it's exponentially faster!Here is our new process_posts()
:#process a batch of postsdef process_posts(csv_file, max_workers=5, location="us", retries=3): with open(csv_file, newline="") as csvfile: reader = list(csv.DictReader(csvfile)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map(process_post, reader, [location] * len(reader), [retries] * len(reader))
executor.map()
takes process_post()
as its first argument and then it passes all of our other arguments into it as lists. This opens up a new thread to process each post and save its data to its own individual CSV file.For instance if we have an article in the CSV generated earlier called "Headline Here". We'll now have a separate CSV file specifically for comments and metadata from the "Headline Here" article, and we'll have it fast.process_post()
, we only need to change one line: driver.get(get_scrapeops_url(r_url, location=location))
.We once again pass get_scrapeops_url()
directly into driver.get()
so our scraper navigates directly to the proxied url.Here is our final Python script that makes full use of both the crawler and scraper.from selenium import webdriverfrom selenium.webdriver import ChromeOptionsfrom selenium.webdriver.common.by import Byfrom urllib.parse import urlencodeimport csv, json, timeimport logging, osfrom dataclasses import dataclass, field, fields, asdictfrom concurrent.futures import ThreadPoolExecutor user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'options = ChromeOptions()options.add_argument("--headless")options.add_argument(f"user-agent={user_agent}") proxy_url = "https://proxy.scrapeops.io/v1/"API_KEY = "YOUR-SUPER-SECRET-API-KEY" logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" author: str = "" permalink: str = "" upvote_ratio: float = 0.0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass CommentData: name: str = "" body: str = "" upvotes: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == '': setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename='', storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode='a', newline='', encoding='utf-8') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url #get posts from a subredditdef get_posts(feed, limit=100, retries=3, data_pipeline=None, location="us"): tries = 0 success = False while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: url = f"https://www.reddit.com/r/{feed}.json?limit={limit}" driver.get(get_scrapeops_url(url, location=location)) json_text = driver.find_element(By.TAG_NAME, "pre").text resp = json.loads(json_text) if resp: success = True children = resp["data"]["children"] for child in children: data = child["data"] article_data = SearchData( name=data["title"], author=data["author"], permalink=data["permalink"], upvote_ratio=data["upvote_ratio"] ) data_pipeline.add_data(article_data) else: logger.warning(f"Failed response from server, tries left: {retries-tries}") raise Exception("Failed to get posts") except Exception as e: driver.save_screenshot(f"error-{tries}.png") logger.warning(f"Exeception, failed to get posts: {e}") tries += 1 finally: driver.quit() def process_post(post_object, location="us", retries=3): tries = 0 success = False permalink = post_object["permalink"] r_url = f"https://www.reddit.com{permalink}.json" link_array = permalink.split("/") filename = link_array[-2].replace(" ", "-") comment_pipeline = DataPipeline(csv_filename=f"{filename}.csv") while tries <= retries and not success: driver = webdriver.Chrome(options=options) try: driver.get(get_scrapeops_url(r_url, location=location)) comment_data = driver.find_element(By.TAG_NAME, "pre").text if not comment_data: raise Exception(f"Failed response: {comment_data.status_code}") comments = json.loads(comment_data) comments_list = comments[1]["data"]["children"] for comment in comments_list: if comment["kind"] != "more": data = comment["data"] comment_data = CommentData( name=data["author"], body=data["body"], upvotes=data["ups"] ) comment_pipeline.add_data(comment_data) comment_pipeline.close_pipeline() success = True except Exception as e: logger.warning(f"Failed to retrieve comment:\n{e}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max retries exceeded {retries}") #process a batch of postsdef process_posts(csv_file, max_workers=5, location="us", retries=3): with open(csv_file, newline="") as csvfile: reader = list(csv.DictReader(csvfile)) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map(process_post, reader, [location] * len(reader), [retries] * len(reader)) ########### MAIN FUNCTION ############# if __name__ == "__main__": FEEDS = ["news"] BATCH_SIZE = 10 MAX_THREADS = 11 AGGREGATED_FEEDS = [] for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipeline=feed_pipeline) feed_pipeline.close_pipeline() AGGREGATED_FEEDS.append(f"{feed_filename}.csv") for individual_file in AGGREGATED_FEEDS: process_posts(individual_file, max_workers=MAX_THREADS)
if __name__ == "__main__": FEEDS = ["news"] LOCATION = "us" BATCH_SIZE = 100 MAX_THREADS = 11 AGGREGATED_FEEDS = [] for feed in FEEDS: feed_filename = feed.replace(" ", "-") feed_pipeline = DataPipeline(csv_filename=f"{feed_filename}.csv") get_posts(feed, limit=BATCH_SIZE, data_pipline=feed_pipeline) feed_pipeline.close_pipeline() AGGREGATED_FEEDS.append(f"{feed_filename}.csv") for individual_file in AGGREGATED_FEEDS: process_posts(individual_file, location=LOCATION, max_workers=MAX_THREADS)
MAX_THREADS
to 4. If you want a BATCH_SIZE
of 10, change it to 10. If you'd like to scrape a different Subreddit, just add it to the FEEDS
list.In the production run, we generated 100 CSV files all full of processed comments and metadata. It took 1 minute and 57 seconds to create our article list and generate all 100 of the reports.Then check out ScrapeOps, the complete toolkit for web scraping.
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = "YOUR-SUPER-SECRET-API-KEY"; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function getPosts(browser, feed, limit=10, retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); const namesSeen = []; try { const url = `https://www.reddit.com/r/${feed}.json?limit=${limit}`; await page.goto(getScrapeOpsUrl(url)); success = true; const jsonText = await page.$eval("pre", pre => pre.textContent); const resp = JSON.parse(jsonText); if (resp) { const children = resp.data.children; for (const child of children) { data = child.data; const articleData = { name: data.title, author: data.author, permalink: data.permalink, upvoteRatio: data.upvote_ratio } if (!namesSeen.includes(articleData.name)) { try { await writeToCsv([articleData], `./${feed}.csv`); namesSeen.push(articleData.name); } catch { throw new Error("failed to write csv file:", articleData); } } } } } catch (e) { console.log(`ERROR: ${e}`); tries++; } finally { await page.close(); } }} async function processPost(browser, postObject, location="us", retries=3) { let tries = 0; let success = false; const r_url = `https://www.reddit.com${postObject.permalink}.json`; const linkArray = postObject.permalink.split("/"); const fileName = linkArray[linkArray.length-2].replace(" ", "-"); while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(getScrapeOpsUrl(r_url), {timeout: 30000}); const commentData = await page.$eval("pre", pre => pre.textContent); if (!commentData) { throw new Error(`No comment data found: ${fileName}`); } const comments = JSON.parse(commentData); const commentsList = comments[1].data.children; if (commentsList.length === 0) { return; } for (const comment of commentsList) { if (comment.kind !== "more") { const data = comment.data; const commentData = { name: data.author, body: data.body, upvotes: data.ups } await writeToCsv([commentData], `${fileName}.csv`); success = true; } } } catch (e) { await page.screenshot({path: `ERROR-${fileName}.png`}); console.log(`Error fetching comments for ${fileName}, retries left: ${retries - tries}`); tries++; } finally { await page.close(); } } if (!success) { console.log(`Max retries exceeded for: ${postObject.permalink}`); return; } return;} async function processPosts(browser, inputFile, concurrencyLimit, location="us", retries=3) { const posts = await readCsv(inputFile); while (posts.length > 0) { const currentBatch = posts.splice(0, concurrencyLimit); const tasks = currentBatch.map(post => processPost(browser, post, location, retries)); try { await Promise.all(tasks); } catch (e) { console.log("Failed to process batch"); } } } async function main() { const FEEDS = ["news"]; const RETRIES = 4; const BATCH_SIZE = 40; const concurrencyLimit = 10; AGGREGATED_FEEDS = []; const browser = await puppeteer.launch(); for (const feed of FEEDS) { await getPosts(browser, feed, limit=BATCH_SIZE, RETRIES); AGGREGATED_FEEDS.push(`${feed}.csv`); } for (const individualFile of AGGREGATED_FEEDS) { await processPosts(browser, individualFile,concurrencyLimit, RETRIES); } await browser.close();}main();
concurrencyLimit
is the limit on pages you want Puppeteer to open at once. If you are noticing performance issue, reduce your concurrencyLimit
.https://www.reddit.com/r/news/?rdt=51809.
https://www.reddit.com/r/news.json.
/?rdt=51809
to .json
, we've turned Reddit into a full blown JSON feed!const person = {name: "John Doe", age: 30};
console.log(`Name: ${person.name}`);console.log(`Age: ${person.age}`);
resp
or response object. Once we have our response object, we can use .
to access different parts of the object.Below, you'll see how we access individual articles from this JSON.resp.data.children[0]
resp.data.children[1]
resp.data.children[2]
https://www.reddit.com/r/news.json
. We can add a limit parameter to this url for finer control of our results.If we want 100 news results, our url would look like this:https://www.reddit.com/r/news.json?limit=100
JSON.parse()
.reddit-scraper
.mkdir reddit-scraper
cd reddit-scraper
npm init --y
npm install puppeteer
npm install csv-writer
npm install csv-parse
getPosts()
. This function takes three arguments: browser
, feed
, and retries
(this one is a keyword argument) which is set to 3 by default.Error
and allow the crawler to crash.pre
tag, so we simply use Puppeteer to find the pre
tag with page.$eval()
before loading our text.data
field.title
, author
, permalink
, and upvote_ratio
.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = "YOUR-SUPER-SECRET-API-KEY";const DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"; async function getPosts(browser, feed, retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); const namesSeen = []; try { const url = `https://www.reddit.com/r/${feed}.json?`; await page.setUserAgent(DEFAULT_USER_AGENT); await page.goto(url); success = true; const jsonText = await page.$eval("pre", pre => pre.textContent); const resp = JSON.parse(jsonText); if (resp) { const children = resp.data.children; for (const child of children) { data = child.data; const articleData = { name: data.title, author: data.author, permalink: data.permalink, upvoteRatio: data.upvote_ratio } if (!namesSeen.includes(articleData.name)) { console.log(articleData); namesSeen.push(articleData.name); } } } } catch (e) { await page.screenshot({path: "error.png"}); console.log(`ERROR: ${e}`); tries++; } finally { await page.close(); } }} async function main() { const FEEDS = ["news"]; const RETRIES = 4; const BATCH_SIZE = 10; AGGREGATED_FEEDS = []; const browser = await puppeteer.launch(); for (const feed of FEEDS) { await getPosts(browser, feed, RETRIES); AGGREGATED_FEEDS.push(`${feed}.csv`); } await browser.close();} main();
getPosts()
function to take an additional keyword, limit
. Taking our limit
into account, our url will now look like this:https://www.reddit.com/r/{feed}.json?limit={limit}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = "YOUR-SUPER-SECRET-API-KEY";const DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"; async function getPosts(browser, feed, limit=10, retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); const namesSeen = []; try { const url = `https://www.reddit.com/r/${feed}.json?limit=${limit}`; await page.setUserAgent(DEFAULT_USER_AGENT); await page.goto(url); success = true; const jsonText = await page.$eval("pre", pre => pre.textContent); const resp = JSON.parse(jsonText); if (resp) { const children = resp.data.children; for (const child of children) { data = child.data; const articleData = { name: data.title, author: data.author, permalink: data.permalink, upvoteRatio: data.upvote_ratio } if (!namesSeen.includes(articleData.name)) { console.log(articleData); namesSeen.push(articleData.name); } } } } catch (e) { await page.screenshot({path: "error.png"}); console.log(`ERROR: ${e}`); tries++; } finally { await page.close(); } }} async function main() { const FEEDS = ["news"]; const RETRIES = 4; const BATCH_SIZE = 10; AGGREGATED_FEEDS = []; const browser = await puppeteer.launch(); for (const feed of FEEDS) { await getPosts(browser, feed, BATCH_SIZE, RETRIES); AGGREGATED_FEEDS.push(`${feed}.csv`); } await browser.close();} main();
limit
parameter to getPosts()
. We pass our BATCH_SIZE
into getPosts()
to control the size of our results. Feel free to try changing the batch size and examining your results. limit
is incredibly important.We don't want to scrape through hundreds of results if we only need 10... and we certainly don't want to try scraping hundreds of results when we're only limited to 10!This limit is the foundation of all the data we're going to scrape.writeToCsv()
function and our namesSeen
array will actually filter repeats out of the data we choose to save.namesSeen
to filter out repeat data (just like we have been, but now it's particularly important). All data that we haven't seen goes straight into the CSV file.We write each object to CSV individually. We do this for safety. In the event of a crash, our scraper will not miss out on a batch when it fails, but instead, just an individual result.Everything scraped up until that point will be put into our CSV. Also, pay attention to the fileExists
variable in the writeCsv function. This is a simple boolean
value. If it's true, we append the file. If the file doesn't exist, we create it.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = "YOUR-SUPER-SECRET-API-KEY";const DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { return; } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); console.log(`successfully wrote data to ${outputFile}`); } catch (e) { console.log(`failed to write to csv: ${e}`); }} async function getPosts(browser, feed, limit=10, retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); const namesSeen = []; try { const url = `https://www.reddit.com/r/${feed}.json?limit=${limit}`; await page.setUserAgent(DEFAULT_USER_AGENT); await page.goto(url); success = true; const jsonText = await page.$eval("pre", pre => pre.textContent); const resp = JSON.parse(jsonText); if (resp) { const children = resp.data.children; for (const child of children) { data = child.data; const articleData = { name: data.title, author: data.author, permalink: data.permalink, upvoteRatio: data.upvote_ratio } if (!namesSeen.includes(articleData.name)) { await writeToCsv([articleData], `./${feed}.csv`); namesSeen.push(articleData.name); } } } } catch (e) { console.log(`ERROR: ${e}`); tries++; } finally { await page.close(); } }} async function main() { const FEEDS = ["news"]; const RETRIES = 4; const BATCH_SIZE = 10; AGGREGATED_FEEDS = []; const browser = await puppeteer.launch(); for (const feed of FEEDS) { await getPosts(browser, feed, BATCH_SIZE, RETRIES); AGGREGATED_FEEDS.push(`${feed}.csv`); } await browser.close();} main();
writeCsv()
to write each object to a CSV file as it comes. This allows us to write all objects as soon as we find them, they don't hang in memory while we complete the for loop.We also use a boolean
, fileExists
which tells the writer whether to create or append our CSV file.page.goto()
requests through the ScrapeOps Proxy API Aggregator. This API gives us the benefit of rotating IP addresses, and it always selects the best proxy available.In this code snippet, we create a really small function, getScrapeOpsUrl()
This function takes in a regular url and uses simple string formatting to create a proxied url.function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;}
const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = "YOUR-SUPER-SECRET-API-KEYS"; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { return; } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); console.log(`successfully wrote data to ${outputFile}`); } catch (e) { console.log(`failed to write to csv: ${e}`); }} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function getPosts(browser, feed, limit=10, retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); const namesSeen = []; try { const url = `https://www.reddit.com/r/${feed}.json?limit=${limit}`; await page.goto(getScrapeOpsUrl(url)); success = true; const jsonText = await page.$eval("pre", pre => pre.textContent); const resp = JSON.parse(jsonText); if (resp) { const children = resp.data.children; for (const child of children) { data = child.data; const articleData = { name: data.title, author: data.author, permalink: data.permalink, upvoteRatio: data.upvote_ratio } if (!namesSeen.includes(articleData.name)) { await writeToCsv([articleData], `./${feed}.csv`); namesSeen.push(articleData.name); } } } } catch (e) { console.log(`ERROR: ${e}`); tries++; } finally { await page.close(); } }} async function main() { const FEEDS = ["news"]; const RETRIES = 4; const BATCH_SIZE = 10; AGGREGATED_FEEDS = []; const browser = await puppeteer.launch(); for (const feed of FEEDS) { await getPosts(browser, feed, BATCH_SIZE, RETRIES); AGGREGATED_FEEDS.push(`${feed}.csv`); } await browser.close();} main();
url
into getScrapeOpsUrl()
. We then pass the result directly into page.goto()
so Puppeteer will take us to the new proxied url instead of the regular one.async function main() { const FEEDS = ["news"]; const RETRIES = 4; const BATCH_SIZE = 100; AGGREGATED_FEEDS = []; const browser = await puppeteer.launch(); for (const feed of FEEDS) { await getPosts(browser, feed, BATCH_SIZE, RETRIES); AGGREGATED_FEEDS.push(`${feed}.csv`); } await browser.close();}
permalink
from post objects we created earlier in our crawler. We're not ready to run this code yet, because we need to be able to read the CSV file we created earlier. If we can't read the file, our scraper won't know which posts to process.async function processPost(browser, postObject, location="us", retries=3) { let tries = 0; let success = false; const r_url = `https://www.reddit.com${postObject.permalink}.json`; const linkArray = postObject.permalink.split("/"); const fileName = linkArray[linkArray.length-2].replace(" ", "-"); while (tries <= retries && !success) { const page = await browser.newPage(); const namesSeen = []; try { await page.setUserAgent(DEFAULT_USER_AGENT); await page.goto(r_url); const commentData = await page.$eval("pre", pre => pre.textContent); if (!commentData) { throw new Error(`No comment data found: ${fileName}`); } const comments = JSON.parse(commentData); const commentsList = comments[1].data.children; for (const comment of commentsList) { if (comment.kind !== "more") { const data = comment.data; const commentData = { name: data.author, body: data.body, upvotes: data.ups } console.log("Comment Data:", commentData); success = true; } } } catch (e) { await page.screenshot({path: "error.png"}); console.log(`Error fetching comments for ${fileName}`); tries++; } finally { await page.close(); } }}
comment.kind
is not "more"
, we assume that these are comments we want to process.We pull the author
, body
, and upvotes
for each individual comment. If someone wants to look at this data in a large scope, they can then compare accurately to see which types of comments get the best reactions from people.readCsv()
, which allows us to read individual rows from the CSV file. We'll call processPost()
on each row we read from the file.Here is the full code example that reads rows from the CSV file and processes them. We have an additional function, processPosts()
. It uses a for loop as just a placeholder for now, but later on, this function will be rewritten for better concurrency with async
support.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = "YOUR-SUPER-SECRET-API-KEY";const DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { return; } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); console.log(`successfully wrote data to ${outputFile}`); } catch (e) { console.log(`failed to write to csv: ${e}`); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function getPosts(browser, feed, limit=10, retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); const namesSeen = []; try { const url = `https://www.reddit.com/r/${feed}.json?limit=${limit}`; await page.setUserAgent(DEFAULT_USER_AGENT); await page.goto(getScrapeOpsUrl(url)); success = true; const jsonText = await page.$eval("pre", pre => pre.textContent); const resp = JSON.parse(jsonText); if (resp) { const children = resp.data.children; for (const child of children) { data = child.data; const articleData = { name: data.title, author: data.author, permalink: data.permalink, upvoteRatio: data.upvote_ratio } if (!namesSeen.includes(articleData.name)) { await writeToCsv([articleData], `./${feed}.csv`); namesSeen.push(articleData.name); } } } } catch (e) { console.log(`ERROR: ${e}`); tries++; } finally { await page.close(); } }} async function processPost(browser, postObject, location="us", retries=3) { let tries = 0; let success = false; const r_url = `https://www.reddit.com${postObject.permalink}.json`; const linkArray = postObject.permalink.split("/"); const fileName = linkArray[linkArray.length-2].replace(" ", "-"); while (tries <= retries && !success) { const page = await browser.newPage(); const namesSeen = []; try { await page.setUserAgent(DEFAULT_USER_AGENT); await page.goto(r_url); const commentData = await page.$eval("pre", pre => pre.textContent); if (!commentData) { throw new Error(`No comment data found: ${fileName}`); } const comments = JSON.parse(commentData); const commentsList = comments[1].data.children; for (const comment of commentsList) { if (comment.kind !== "more") { const data = comment.data; const commentData = { name: data.author, body: data.body, upvotes: data.ups } console.log("Comment Data:", commentData); success = true; } } } catch (e) { await page.screenshot({path: "error.png"}); console.log(`Error fetching comments for ${fileName}`); tries++; } finally { await page.close(); } }} async function processPosts(browser, inputFile, location="us", retries=3) { const posts = await readCsv(inputFile); for (const post of posts) { await processPost(browser, post); }} async function main() { const FEEDS = ["news"]; const RETRIES = 4; const BATCH_SIZE = 1; AGGREGATED_FEEDS = []; const browser = await puppeteer.launch(); for (const feed of FEEDS) { await getPosts(browser, feed, BATCH_SIZE, RETRIES); AGGREGATED_FEEDS.push(`${feed}.csv`); } for (const individualFile of AGGREGATED_FEEDS) { await processPosts(browser, individualFile, RETRIES); } await browser.close();} main();
processPosts()
to read all the data from a Subreddit CSV. This function runs processPost()
on each individual post so we can extract important comment data from the post.commentData
object and pass it into our writeCsv()
function. Just like before, we write each object as soon as we've extracted the data. This allows us to write as much data as possible in the event of a crash.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = "YOUR-SUPER-SECRET-API-KEY"; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { return; } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); console.log(`successfully wrote data to ${outputFile}`); } catch (e) { console.log(`failed to write to csv: ${e}`); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function getPosts(browser, feed, limit=10, retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); const namesSeen = []; try { const url = `https://www.reddit.com/r/${feed}.json?limit=${limit}`; await page.setUserAgent(DEFAULT_USER_AGENT); await page.goto(getScrapeOpsUrl(url)); success = true; const jsonText = await page.$eval("pre", pre => pre.textContent); const resp = JSON.parse(jsonText); if (resp) { const children = resp.data.children; for (const child of children) { data = child.data; const articleData = { name: data.title, author: data.author, permalink: data.permalink, upvoteRatio: data.upvote_ratio } if (!namesSeen.includes(articleData.name)) { await writeToCsv([articleData], `./${feed}.csv`); namesSeen.push(articleData.name); } } } } catch (e) { console.log(`ERROR: ${e}`); tries++; } finally { await page.close(); } }} async function processPost(browser, postObject, location="us", retries=3) { let tries = 0; let success = false; const r_url = `https://www.reddit.com${postObject.permalink}.json`; const linkArray = postObject.permalink.split("/"); const fileName = linkArray[linkArray.length-2].replace(" ", "-"); while (tries <= retries && !success) { const page = await browser.newPage(); const namesSeen = []; try { await page.goto(r_url); const commentData = await page.$eval("pre", pre => pre.textContent); if (!commentData) { throw new Error(`No comment data found: ${fileName}`); } const comments = JSON.parse(commentData); const commentsList = comments[1].data.children; for (const comment of commentsList) { if (comment.kind !== "more") { const data = comment.data; const commentData = { name: data.author, body: data.body, upvotes: data.ups } await writeToCsv([commentData], `${fileName}.csv`); success = true; } } } catch (e) { await page.screenshot({path: "error.png"}); console.log(`Error fetching comments for ${fileName}`); tries++; } finally { await page.close(); } }} async function processPosts(browser, inputFile, location="us", retries=3) { const posts = await readCsv(inputFile); for (const post of posts) { await processPost(browser, post); }} async function main() { const FEEDS = ["news"]; const RETRIES = 4; const BATCH_SIZE = 1; AGGREGATED_FEEDS = []; const browser = await puppeteer.launch(); for (const feed of FEEDS) { await getPosts(browser, feed, BATCH_SIZE, RETRIES); AGGREGATED_FEEDS.push(`${feed}.csv`); } for (const individualFile of AGGREGATED_FEEDS) { await processPosts(browser, individualFile, RETRIES); } await browser.close();} main();
commentData
objects out of them. We then write these objects to CSV files just like we did before when we were parsing the articles themselves from the Subreddit feed. This code might get you blocked.Puppeteer is faster than any human could possibly be and Reddit will notice abonormalities. Once we've add concurrency, we're going to add proxy support into our scraper as well.Promise.all()
in combination with async
/await
support. Because this makes us even faster, this is actually going to increase our likelihood of getting blocked, so adding proxy support in the next section is super important! Now it's exponentially faster!Here is our new processPosts()
:async function processPosts(browser, inputFile, concurrencyLimit, location="us", retries=3) { const posts = await readCsv(inputFile); while (posts.length > 0) { const currentBatch = posts.splice(0, concurrencyLimit); const tasks = currentBatch.map(post => processPost(browser, post, location, retries)); try { await Promise.all(tasks); } catch (e) { console.log("Failed to process batch"); } } }
slice()
a chunk out of our array and run processPost()
on each row in that chunk. This reduces the size of our array as we go and frees more memory for us. Instead of getting bogged down by the end of the operation, our scraper is actually running with more efficiency than it would have been in the beginning.Once our posts
array is down to 0, we can then exit the function.processPost()
, we only need to change one line: await page.goto(getScrapeOpsUrl(r_url));
. We once again pass getScrapeOpsUrl()
directly into page.goto()
so our scraper navigates directly to the proxied url.Here is our final script that makes full use of both the crawler and scraper.const puppeteer = require("puppeteer");const createCsvWriter = require("csv-writer").createObjectCsvWriter;const csvParse = require("csv-parse");const fs = require("fs"); const API_KEY = "YOUR-SUPER-SECRET-API-KEY"; async function writeToCsv(data, outputFile) { if (!data || data.length === 0) { throw new Error("No data to write!"); } const fileExists = fs.existsSync(outputFile); const headers = Object.keys(data[0]).map(key => ({id: key, title: key})) const csvWriter = createCsvWriter({ path: outputFile, header: headers, append: fileExists }); try { await csvWriter.writeRecords(data); } catch (e) { throw new Error("Failed to write to csv"); }} async function readCsv(inputFile) { const results = []; const parser = fs.createReadStream(inputFile).pipe(csvParse.parse({ columns: true, delimiter: ",", trim: true, skip_empty_lines: true })); for await (const record of parser) { results.push(record); } return results;} function getScrapeOpsUrl(url, location="us") { const params = new URLSearchParams({ api_key: API_KEY, url: url, country: location }); return `https://proxy.scrapeops.io/v1/?${params.toString()}`;} async function getPosts(browser, feed, limit=10, retries=3) { let tries = 0; let success = false; while (tries <= retries && !success) { const page = await browser.newPage(); const namesSeen = []; try { const url = `https://www.reddit.com/r/${feed}.json?limit=${limit}`; await page.goto(getScrapeOpsUrl(url)); success = true; const jsonText = await page.$eval("pre", pre => pre.textContent); const resp = JSON.parse(jsonText); if (resp) { const children = resp.data.children; for (const child of children) { data = child.data; const articleData = { name: data.title, author: data.author, permalink: data.permalink, upvoteRatio: data.upvote_ratio } if (!namesSeen.includes(articleData.name)) { try { await writeToCsv([articleData], `./${feed}.csv`); namesSeen.push(articleData.name); } catch { throw new Error("failed to write csv file:", articleData); } } } } } catch (e) { console.log(`ERROR: ${e}`); tries++; } finally { await page.close(); } }} async function processPost(browser, postObject, location="us", retries=3) { let tries = 0; let success = false; const r_url = `https://www.reddit.com${postObject.permalink}.json`; const linkArray = postObject.permalink.split("/"); const fileName = linkArray[linkArray.length-2].replace(" ", "-"); while (tries <= retries && !success) { const page = await browser.newPage(); try { await page.goto(getScrapeOpsUrl(r_url), {timeout: 30000}); const commentData = await page.$eval("pre", pre => pre.textContent); if (!commentData) { throw new Error(`No comment data found: ${fileName}`); } const comments = JSON.parse(commentData); const commentsList = comments[1].data.children; if (commentsList.length === 0) { return; } for (const comment of commentsList) { if (comment.kind !== "more") { const data = comment.data; const commentData = { name: data.author, body: data.body, upvotes: data.ups } await writeToCsv([commentData], `${fileName}.csv`); success = true; } } } catch (e) { await page.screenshot({path: `ERROR-${fileName}.png`}); console.log(`Error fetching comments for ${fileName}, retries left: ${retries - tries}`); tries++; } finally { await page.close(); } } if (!success) { console.log(`Max retries exceeded for: ${postObject.permalink}`); return; } return;} async function processPosts(browser, inputFile, concurrencyLimit, location="us", retries=3) { const posts = await readCsv(inputFile); while (posts.length > 0) { const currentBatch = posts.splice(0, concurrencyLimit); const tasks = currentBatch.map(post => processPost(browser, post, location, retries)); try { await Promise.all(tasks); } catch (e) { console.log("Failed to process batch"); } } } async function main() { const FEEDS = ["news"]; const RETRIES = 4; const BATCH_SIZE = 100; const concurrencyLimit = 20; AGGREGATED_FEEDS = []; const browser = await puppeteer.launch(); for (const feed of FEEDS) { await getPosts(browser, feed, BATCH_SIZE, RETRIES); AGGREGATED_FEEDS.push(`${feed}.csv`); } for (const individualFile of AGGREGATED_FEEDS) { await processPosts(browser, individualFile,concurrencyLimit, RETRIES); } await browser.close();} main();
async function main() { const FEEDS = ["news"]; const RETRIES = 4; const BATCH_SIZE = 100; const concurrencyLimit = 20; AGGREGATED_FEEDS = []; const browser = await puppeteer.launch(); for (const feed of FEEDS) { await getPosts(browser, feed, BATCH_SIZE, RETRIES); AGGREGATED_FEEDS.push(`${feed}.csv`); } for (const individualFile of AGGREGATED_FEEDS) { await processPosts(browser, individualFile,concurrencyLimit, RETRIES); } await browser.close();}
FEEDS
list. If you'd like to change concurrencyLimit
feel free to do so.In testing, we found that 20 pages was optimal and after that, we began to see bad results more often. Remember, your concurrencyLimit
is quite literally the amount of pages you have open in the browser.If you set the limit to 100, Puppeteer will attempt to do all this work with 100 pages open and you probably will experience both performance issues and issues with the ScrapeOps API, you can only have so many concurrent pages open with the ScrapeOps API.In the production run, we generated 100 CSV files all full of processed comments and metadata. It took 1 minute and 41 seconds to create our article list and generate all 100 of the reports.... That is lightning fast. During peak hours (when Reddit and ScrapeOps are being accessed more often), this same script has taken up to 3 minutes and 40 seconds.Bear in mind that the speed of our responses depends on both the Reddit server, and the ScrapeOps server, so your results will probably vary. If you notice issues with your scraper getting stuck or moving too slow: decrease your concurrencyLimit
.