Then check out ScrapeOps, the complete toolkit for web scraping.
config.json file in it (place your ScrapeOps API key inside this file). It should look similar to what you see below.{ "api_key": "YOUR-SUPER-SECRET-API-KEY"}
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text video_data = VideoData( name=href.split("/")[-1], url=href, views=views ) video_pipeline.add_data(video_data) success = True video_pipeline.close_pipeline() except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel_content, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
channel_list with the channels you'd like to scrape.
python name_of_your_python_file.pymain as well:MAX_RETRIES: Defines the maximum number of times the script will attempt to retry scraping a particular TikTok channel or content if the initial request fails. Increase MAX_RETRIES if you want the script to be more persistent in trying to scrape a channel.MAX_THREADS: Determines the number of threads that the script will use for concurrent processing. This means how many channels or content pages the script can scrape simultaneously. Increase MAX_THREADS to speed up the scraping process, especially if you have a large number of channels to scrape.LOCATION: Specifies the geographical location from which the scraping requests should appear to originate. This is useful because TikTok content can vary depending on the user’s location due to regional restrictions or content preferences.https://www.tiktok.com/@paranormalpodcast
https://www.tiktok.com/@{name_of_channel}
json module and index through it like any other dict object. Take a look below.The data we're looking for is a script element with an id of __UNIVERSAL_DATA_FOR_REHYDRATION__. This is the data that TikTok uses to start building the page and this is the data that we're going to scrape.country param."country": "us", ScrapeOps will route us through a server in the US."uk" in as our country, ScrapeOps will route us through the UK.mkdir tiktok-scraper cd tiktok-scraper
python -m venv venvsource venv/bin/activatepip install requestspip install beautifulsoup4script (JavaScript) element from the page. Embedded within this JavaScript is a JSON blob. The JSON blob holds all sorts of interesting information about the channel.Along with some basic structure and retry logic, this script does exactly that. Take a look at the Python script below.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_channel(channel_name, location, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = { "name": unique_id, "follower_count": follower_count, "likes": likes, "video_count": video_count, "nickname": nickname, "verified": verified, "signature": signature } print(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, max_threads=5, retries=3): for channel in channel_list: scrape_channel(channel, location, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Scrape starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes start_scrape(channel_list, LOCATION, retries=MAX_RETRIES) logger.info(f"Scrape complete.")
tries left and the operation has not succeeded:soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']").json.loads(script_tag.text) converts text of the script object into a dict we can index from Python.namefollower_countlikesvideo_countnicknameverifiedsignatureProfileData and DataPipeline.ProfileData is used specifically for holding information from the profiles we scrape.DataPipeline object takes a dataclass (in this case ProfileData) and pipes it into a CSV file while removing duplicates.ProfileData class.@dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, retries=3): for channel in channel_list: scrape_channel(channel, location, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Scrape starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Scrape complete.")
ThreadPoolExecutor to spawn scrape_channel() on multiple threads. This will greatly increase our speed and efficiency.The code snippet below replaces our for loop and runs scrape_channel() with ThreadPoolExecutor.def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) )
executor.map():scrape_channel tells executor to run scrape_channel() on every available thread.channel_list is the list of channels we want to pass into scrape_channel().location, data_pipeline, and retries in as arrays to be passed to each individual thread.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Scrape starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Scrape complete.")
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
get_scrapeops_url() takes in a number of arguments and converts any url into a ScrapeOps proxied url. Here are the individual arguments."api_key": is your ScrapeOps API key."url": is the url that you'd like to scrape."country": is the location you'd like to be routed through."residential": is a boolean value. When we set residential to True, we're telling ScrapeOps that we want a residential IP address. Anti-bots are far less likely to block a residential IP than a data center IP.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Scrape starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Scrape complete.")
main. MAX_RETRIES is set to 3. MAX_THREADS is set to 5, and our location is set to "uk". Feel free to change any of these constants.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Scrape starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Scrape complete.")
wait parameter to the ScrapeOps URL, and we'll pull some data out of some incredibly nested elements. This scraper needs to do the following:scrape_channel_content() function. It looks a lot like our first parsing function.def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text video_data = { "name": href.split("/")[-1], "url": href, "views": views } print(video_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}")
main_content.find_all("a").views, url, and name (id number) from the link element.start_scrape().We'll call this one process_results(). This function will read our CSV into an array object. Then it will iterate through all the rows of the array and call scrape_channel_content() on them.Here is process_results().def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_channel_content(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text video_data = { "name": href.split("/")[-1], "url": href, "views": views } print(video_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_channel_content(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, retries=MAX_RETRIES) logger.info("Content scrape complete")
process_results() reads our CSV file into an array. It then runs scrape_channel_content() on each row from the file.DataPipeline, we just need a dataclass to pass into it. We'll call this one VideoData. This class will hold a the following:name: the unique number given to the photo or video.url: the url where we can find the photo or video.views: the amount of views that the photo or video has received.@dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline and pass VideoData objects into it.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text video_data = VideoData( name=href.split("/")[-1], url=href, views=views ) video_pipeline.add_data(video_data) success = True video_pipeline.close_pipeline() except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_channel_content(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, retries=MAX_RETRIES) logger.info("Content scrape complete")
dataclass gives us almost everything we need to properly scrape the content from all of these channels.process_results() to take advantage of multithreading with ThreadPoolExecutor.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel_content, reader, [location] * len(reader), [retries] * len(reader) )
process_results() function, we now pass scrape_channel_content in as our first argument. Then we pass in the reader object (an array of dict objects we want to process). location and retries both get passed in as arrays as well.get_scrapeops_url(). Before we call it again in our code, we're going to add one more argument to it, "wait": 2000.This will tell the ScrapeOps server to wait 2 seconds for content to render before sending it back to us. We need to do this so that the videos and photos from these channels can be fetched and loaded into the page.Here is our finished proxy function.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
scrapeops_proxy_url = get_scrapeops_url(url, location=location)response = requests.get(scrapeops_proxy_url)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text video_data = VideoData( name=href.split("/")[-1], url=href, views=views ) video_pipeline.add_data(video_data) success = True video_pipeline.close_pipeline() except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel_content, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
main we'll be working with. Feel free to change any of the following constants.MAX_RETRIESMAX_THREADSLOCATIONchannel_listif __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
robots.txt.You may view TikTok's terms here and their robots.txt is available here.It's important to examine both of these files because violating them can get you blocked or even permanently banned.If you are unsure of the legality of a scraping project, generally public data (data not behind a login) is public information and therefore fair game when scraping.If your data is gated behind a login or some other type of authentication, this data is considered private property and you will be subject to individual privacy and intellectual property laws.Any time you're not sure if your data is public or private, make sure to consult an attorney.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json file.import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "json_response": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_page = html.unescape(page["body"]) soup = BeautifulSoup(decoded_page, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href or "https://www.tiktok.com" not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text name = href.split("/")[-1] try: int(name) except: logger.info(f"Ad found, skipping item: {name}") continue video_data = VideoData( name=href.split("/")[-1], url=href, views=views ) video_pipeline.add_data(video_data) success = True video_pipeline.close_pipeline() except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel_content, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
main:MAX_RETRIES: Defines the maximum number of times the script will attempt to scrape a URL if an error occurs during the process.MAX_THREADS: Specifies the maximum number of concurrent threads that can run at the same time during the scraping process.LOCATION: Defines the geographical location to simulate while sending requests to the target website.channel_info: A list of TikTok channel usernames that the script is set to scrape.https://www.tiktok.com/@paranormalpodcast
https://www.tiktok.com/@{name_of_channel}
script (JavaScript) element packed with all the necessary data to load the channel page.If you look in the screenshot below, you can see evidence of this. You might notice the id: __UNIVERSAL_DATA_FOR_REHYDRATION__.With this unique information, we can write a CSS selector to find this tag on the page: script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__'].All of our videos on the page are embedded inside a elements. The CSS here is jumbled nonsense. To extract these, we'll have to get a bit more creative than just a CSS selector.If you're able to see below, the href of the element seems to have a pretty uniform structure. These videos are laid out like this:https://www.tiktok.com/@paranormalpodcast/video/7423989765457857798
href values to filter our links:https://www.tiktok.com/@{name_of_channel}/video/{id_number}
country parameter. We simply need to pass a country code into this param. Proxy Aggregator reads the country code and routes the request through our desired country.If we wish to show up in the US, our country code would be us.| Country | Country Code |
|---|---|
| Brazil | br |
| Canada | ca |
| China | cn |
| India | in |
| Italy | it |
| Japan | jp |
| France | fr |
| Germany | de |
| Russia | ru |
| Spain | es |
| United States | us |
| United Kingdom | uk |
cd into the folder.mkdir tiktok-seleniumcd tiktok-selenium
python -m venv venvsource venv/bin/activatepip install seleniumscrape_channel() performs our actual scraping logic. start_scrape() is used to trigger a scrape. Our runtime is held inside the main block.import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_channel(channel_name, location, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = { "name": unique_id, "follower_count": follower_count, "likes": likes, "video_count": video_count, "nickname": nickname, "verified": verified, "signature": signature } print(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, retries=3): for channel in channel_list: scrape_channel(channel, location, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes start_scrape(channel_list, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") finds our JSON respoonse from Proxy Aggregator. This will not actually work until we've hooked up the proxy. We take our response as JSON to prevent the page from being altered and rendered. If Selenium renders the page, we get an automatic redirect that skips over our target page.html.unescape(page["body"]).BeautifulSoup for parsing. Selenium's extraction features are build to interact with the page, and in this case, we don't want to interact with it. We want to keep it intact.soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']").BeautifulSoup for processing. This allows us to dig through the data without any page interactions that could potentially alter the page or redirect us.dict. This is great when we're just trying to get things working, but these data structures don't cover edge cases.For production use, we should use custom, strongly typed objects. We also need a way to pipe these objects into a CSV file.This is our ProfileData class. We'll use it to replace the dict we used earlier. If a field is missing in this object, it gets replaced by a default value: No {field.name}.Especially when dealing with data used by all sorts of moving parts (people, crawlers, scrapers etc.), this is a much safer appproach to prevent corruption.@dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline. While its waiting to be saved, our data gets held in a storage_queue. is_duplicate() allows us to fileter out duplicate items from the pipeline. When we close our pipeline, we sleep for 3 seconds to wait for any other operations to complete, then it saves the storage_queue to a CSV file.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
dict to the console, we create a ProfileData object. Then, we pass it into the DataPipeline for safe and effective storage.import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, retries=3): for channel in channel_list: scrape_channel(channel, location, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
start_scrape() already takes in a list of channels to scrape them. However, we iterate through them using a for loop. This isn't the most efficient process because our crawler needs to look them up one at a time.With ThreadPoolExecutor, we can parse multiple channels at the same time.Here is our rewritten trigger function. The real magic comes from executor.map(). Take a look at the args we pass into it.scrape_channel: The function we want to call on each thread.channel_list: This is the list of channels we actually wish to crawl.channel_list. executor.map() then takes each arg from its respective list and passes it into a separate instance of scrape_channel.def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) )
import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
api_key, url), we have a couple other parameters we need to get the right response from TikTok. We pass an arbitrary number into wait and Proxy Aggregator will wait that much time for the page to render. We also need to set json_response to True.This is imperative so we can freeze the page. When we receive our JSON response, we pass the body into BeautifulSoup so we can parse the page non-interactively.Here is the function that makes everything work.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "json_response": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "json_response": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
main. Feel free to change any of the following to customize your results.MAX_RETRIESMAX_THREADSLOCATIONchannel_listif __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
wait of 2 seconds per page. This level of speed is almost unfathomable!scrape_channel_content(), our new parser. Like we did earlier, this function is built to take a JSON response from Proxy Aggregator in order to freeze the page and prevent it from interacting with the browser. First, we find our main_content. From there, we gather all of our links.All links must contain the profile name and TikTok's base domain. If they don't they're likely ads or sponsored posts.def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_page = html.unescape(page["body"]) soup = BeautifulSoup(decoded_page, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href or "https://www.tiktok.com" not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text name = href.split("/")[-1] try: int(name) except: logger.info(f"Ad found, skipping item: {name}") continue video_data = { "name": href.split("/")[-1], "url": href, "views": views } print(video_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}")
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") pulls our JSON response from the page containing the API response.json.loads().body from the response and load it into BeautifulSoup for static processing. We cannot load it into the browser. We will get redirected.main_content.find_all("a") finds all of our links. We filter out any bad links that don't contain the proper information.views gets a default value of 0. link.select_one("strong[data-e2e='video-views']") checks to see if the video has views. If it does, we save the text of this element to views.name, the id number of the video, gets extracted from the href of the link.process_results(), it does exactly that.With process_results(), we read the CSV file into an array of dict objects. We then iterate through them with a for loop and call scrape_channel_content() on each row. Later on, we'll replace the for loop and add concurrency soon.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_channel_content(row, location, retries)
import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "json_response": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_page = html.unescape(page["body"]) soup = BeautifulSoup(decoded_page, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href or "https://www.tiktok.com" not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text name = href.split("/")[-1] try: int(name) except: logger.info(f"Ad found, skipping item: {name}") continue video_data = { "name": href.split("/")[-1], "url": href, "views": views } print(video_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_channel_content(row, location, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
DataPipeline. We just need to feed it some dataclass objects. To do this, we'll write one more class. It will be similar to our ProfileData class from earlier. It will have the same methods, but our fields will be a bit different.Take a look at VideoData. We use it to hold the name, url, and views for each video we scrape.@dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline from inside scrape_channel_content(). We feed our VideoData into it until we're done scraping. Once we've completed the parse, we close the pipeline.import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "json_response": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) ## Extract Data video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_page = html.unescape(page["body"]) soup = BeautifulSoup(decoded_page, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href or "https://www.tiktok.com" not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text name = href.split("/")[-1] try: int(name) except: logger.info(f"Ad found, skipping item: {name}") continue video_data = VideoData( name=href.split("/")[-1], url=href, views=views ) video_pipeline.add_data(video_data) success = True video_pipeline.close_pipeline() except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_channel_content(row, location, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
for loop in our trigger function. ThreadPoolExecutor is an excellent tool for this.Our rewritten function is in the snippet below. Our first arg is the function we wish to call, scrape_channel_content. Next, we pass in our CSV file data with reader. All other args get passed in as arrays the length of reader.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel_content, reader, [location] * len(reader), [retries] * len(reader) )
driver.get() line from the parser.scrapeops_proxy_url = get_scrapeops_url(url, location=location)driver.get(scrapeops_proxy_url)
import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "json_response": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_page = html.unescape(page["body"]) soup = BeautifulSoup(decoded_page, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href or "https://www.tiktok.com" not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text name = href.split("/")[-1] try: int(name) except: logger.info(f"Ad found, skipping item: {name}") continue video_data = VideoData( name=href.split("/")[-1], url=href, views=views ) video_pipeline.add_data(video_data) success = True video_pipeline.close_pipeline() except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel_content, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
MAX_RETRIES, MAX_THREADS, LOCATION, and channel_list.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")