Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file in it (place your ScrapeOps API key inside this file). It should look similar to what you see below.{ "api_key": "YOUR-SUPER-SECRET-API-KEY"}
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text video_data = VideoData( name=href.split("/")[-1], url=href, views=views ) video_pipeline.add_data(video_data) success = True video_pipeline.close_pipeline() except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel_content, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
channel_list
with the channels you'd like to scrape.
python name_of_your_python_file.py
main
as well:MAX_RETRIES
: Defines the maximum number of times the script will attempt to retry scraping a particular TikTok channel or content if the initial request fails. Increase MAX_RETRIES
if you want the script to be more persistent in trying to scrape a channel.MAX_THREADS
: Determines the number of threads that the script will use for concurrent processing. This means how many channels or content pages the script can scrape simultaneously. Increase MAX_THREADS
to speed up the scraping process, especially if you have a large number of channels to scrape.LOCATION
: Specifies the geographical location from which the scraping requests should appear to originate. This is useful because TikTok content can vary depending on the user’s location due to regional restrictions or content preferences.https://www.tiktok.com/@paranormalpodcast
https://www.tiktok.com/@{name_of_channel}
json
module and index through it like any other dict
object. Take a look below.script
element with an id
of __UNIVERSAL_DATA_FOR_REHYDRATION__
. This is the data that TikTok uses to start building the page and this is the data that we're going to scrape.country
param."country": "us"
, ScrapeOps will route us through a server in the US."uk"
in as our country, ScrapeOps will route us through the UK.mkdir tiktok-scraper cd tiktok-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
script
(JavaScript) element from the page. Embedded within this JavaScript is a JSON blob. The JSON blob holds all sorts of interesting information about the channel.Along with some basic structure and retry logic, this script does exactly that. Take a look at the Python script below.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_channel(channel_name, location, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = { "name": unique_id, "follower_count": follower_count, "likes": likes, "video_count": video_count, "nickname": nickname, "verified": verified, "signature": signature } print(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, max_threads=5, retries=3): for channel in channel_list: scrape_channel(channel, location, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Scrape starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes start_scrape(channel_list, LOCATION, retries=MAX_RETRIES) logger.info(f"Scrape complete.")
tries
left and the operation has not succeeded:soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")
.json.loads(script_tag.text)
converts text of the script
object into a dict
we can index from Python.name
follower_count
likes
video_count
nickname
verified
signature
ProfileData
and DataPipeline
.ProfileData
is used specifically for holding information from the profiles we scrape.DataPipeline
object takes a dataclass
(in this case ProfileData
) and pipes it into a CSV file while removing duplicates.ProfileData
class.@dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, retries=3): for channel in channel_list: scrape_channel(channel, location, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Scrape starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Scrape complete.")
ThreadPoolExecutor
to spawn scrape_channel()
on multiple threads. This will greatly increase our speed and efficiency.The code snippet below replaces our for
loop and runs scrape_channel()
with ThreadPoolExecutor
.def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) )
executor.map()
:scrape_channel
tells executor
to run scrape_channel()
on every available thread.channel_list
is the list of channels we want to pass into scrape_channel()
.location
, data_pipeline
, and retries
in as arrays to be passed to each individual thread.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Scrape starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Scrape complete.")
def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
get_scrapeops_url()
takes in a number of arguments and converts any url into a ScrapeOps proxied url. Here are the individual arguments."api_key"
: is your ScrapeOps API key."url"
: is the url that you'd like to scrape."country"
: is the location you'd like to be routed through."residential"
: is a boolean value. When we set residential
to True
, we're telling ScrapeOps that we want a residential IP address. Anti-bots are far less likely to block a residential IP than a data center IP.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Scrape starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Scrape complete.")
main
. MAX_RETRIES
is set to 3. MAX_THREADS
is set to 5, and our location is set to "uk"
. Feel free to change any of these constants.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Scrape starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Scrape complete.")
wait
parameter to the ScrapeOps URL, and we'll pull some data out of some incredibly nested elements. This scraper needs to do the following:scrape_channel_content()
function. It looks a lot like our first parsing function.def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text video_data = { "name": href.split("/")[-1], "url": href, "views": views } print(video_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}")
main_content.find_all("a")
.views
, url
, and name
(id number) from the link element.start_scrape()
.We'll call this one process_results()
. This function will read our CSV into an array object. Then it will iterate through all the rows of the array and call scrape_channel_content()
on them.Here is process_results()
.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_channel_content(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text video_data = { "name": href.split("/")[-1], "url": href, "views": views } print(video_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_channel_content(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, retries=MAX_RETRIES) logger.info("Content scrape complete")
process_results()
reads our CSV file into an array. It then runs scrape_channel_content()
on each row from the file.DataPipeline
, we just need a dataclass
to pass into it. We'll call this one VideoData
. This class will hold a the following:name
: the unique number given to the photo or video.url
: the url where we can find the photo or video.views
: the amount of views that the photo or video has received.@dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
and pass VideoData
objects into it.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text video_data = VideoData( name=href.split("/")[-1], url=href, views=views ) video_pipeline.add_data(video_data) success = True video_pipeline.close_pipeline() except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_channel_content(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, retries=MAX_RETRIES) logger.info("Content scrape complete")
dataclass
gives us almost everything we need to properly scrape the content from all of these channels.process_results()
to take advantage of multithreading with ThreadPoolExecutor
.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel_content, reader, [location] * len(reader), [retries] * len(reader) )
process_results()
function, we now pass scrape_channel_content
in as our first argument. Then we pass in the reader
object (an array of dict
objects we want to process). location
and retries
both get passed in as arrays as well.get_scrapeops_url()
. Before we call it again in our code, we're going to add one more argument to it, "wait": 2000
.This will tell the ScrapeOps server to wait 2 seconds for content to render before sending it back to us. We need to do this so that the videos and photos from these channels can be fetched and loaded into the page.Here is our finished proxy function.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
scrapeops_proxy_url = get_scrapeops_url(url, location=location)response = requests.get(scrapeops_proxy_url)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "residential": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text video_data = VideoData( name=href.split("/")[-1], url=href, views=views ) video_pipeline.add_data(video_data) success = True video_pipeline.close_pipeline() except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel_content, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
main
we'll be working with. Feel free to change any of the following constants.MAX_RETRIES
MAX_THREADS
LOCATION
channel_list
if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal", "paranormal140", "paranormal.51" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
robots.txt
.You may view TikTok's terms here and their robots.txt
is available here.It's important to examine both of these files because violating them can get you blocked or even permanently banned.If you are unsure of the legality of a scraping project, generally public data (data not behind a login) is public information and therefore fair game when scraping.If your data is gated behind a login or some other type of authentication, this data is considered private property and you will be subject to individual privacy and intellectual property laws.Any time you're not sure if your data is public or private, make sure to consult an attorney.Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file.import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "json_response": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_page = html.unescape(page["body"]) soup = BeautifulSoup(decoded_page, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href or "https://www.tiktok.com" not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text name = href.split("/")[-1] try: int(name) except: logger.info(f"Ad found, skipping item: {name}") continue video_data = VideoData( name=href.split("/")[-1], url=href, views=views ) video_pipeline.add_data(video_data) success = True video_pipeline.close_pipeline() except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel_content, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
main
:MAX_RETRIES
: Defines the maximum number of times the script will attempt to scrape a URL if an error occurs during the process.MAX_THREADS
: Specifies the maximum number of concurrent threads that can run at the same time during the scraping process.LOCATION
: Defines the geographical location to simulate while sending requests to the target website.channel_info
: A list of TikTok channel usernames that the script is set to scrape.https://www.tiktok.com/@paranormalpodcast
https://www.tiktok.com/@{name_of_channel}
script
(JavaScript) element packed with all the necessary data to load the channel page.If you look in the screenshot below, you can see evidence of this. You might notice the id
: __UNIVERSAL_DATA_FOR_REHYDRATION__
.With this unique information, we can write a CSS selector to find this tag on the page: script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']
.a
elements. The CSS here is jumbled nonsense. To extract these, we'll have to get a bit more creative than just a CSS selector.If you're able to see below, the href
of the element seems to have a pretty uniform structure. These videos are laid out like this:https://www.tiktok.com/@paranormalpodcast/video/7423989765457857798
href
values to filter our links:https://www.tiktok.com/@{name_of_channel}/video/{id_number}
country
parameter. We simply need to pass a country code into this param. Proxy Aggregator reads the country code and routes the request through our desired country.If we wish to show up in the US, our country code would be us
.Country | Country Code |
---|---|
Brazil | br |
Canada | ca |
China | cn |
India | in |
Italy | it |
Japan | jp |
France | fr |
Germany | de |
Russia | ru |
Spain | es |
United States | us |
United Kingdom | uk |
cd
into the folder.mkdir tiktok-seleniumcd tiktok-selenium
python -m venv venv
source venv/bin/activate
pip install selenium
scrape_channel()
performs our actual scraping logic. start_scrape()
is used to trigger a scrape. Our runtime is held inside the main
block.import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_channel(channel_name, location, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = { "name": unique_id, "follower_count": follower_count, "likes": likes, "video_count": video_count, "nickname": nickname, "verified": verified, "signature": signature } print(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, retries=3): for channel in channel_list: scrape_channel(channel, location, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes start_scrape(channel_list, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
finds our JSON respoonse from Proxy Aggregator. This will not actually work until we've hooked up the proxy. We take our response as JSON to prevent the page from being altered and rendered. If Selenium renders the page, we get an automatic redirect that skips over our target page.html.unescape(page["body"])
.BeautifulSoup
for parsing. Selenium's extraction features are build to interact with the page, and in this case, we don't want to interact with it. We want to keep it intact.soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")
.BeautifulSoup
for processing. This allows us to dig through the data without any page interactions that could potentially alter the page or redirect us.dict
. This is great when we're just trying to get things working, but these data structures don't cover edge cases.For production use, we should use custom, strongly typed objects. We also need a way to pipe these objects into a CSV file.This is our ProfileData
class. We'll use it to replace the dict
we used earlier. If a field is missing in this object, it gets replaced by a default value: No {field.name}
.Especially when dealing with data used by all sorts of moving parts (people, crawlers, scrapers etc.), this is a much safer appproach to prevent corruption.@dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. While its waiting to be saved, our data gets held in a storage_queue
. is_duplicate()
allows us to fileter out duplicate items from the pipeline. When we close our pipeline, we sleep
for 3 seconds to wait for any other operations to complete, then it saves the storage_queue
to a CSV file.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
dict
to the console, we create a ProfileData
object. Then, we pass it into the DataPipeline
for safe and effective storage.import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, retries=3): for channel in channel_list: scrape_channel(channel, location, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
start_scrape()
already takes in a list of channels to scrape them. However, we iterate through them using a for
loop. This isn't the most efficient process because our crawler needs to look them up one at a time.With ThreadPoolExecutor
, we can parse multiple channels at the same time.Here is our rewritten trigger function. The real magic comes from executor.map()
. Take a look at the args we pass into it.scrape_channel
: The function we want to call on each thread.channel_list
: This is the list of channels we actually wish to crawl.channel_list
. executor.map()
then takes each arg from its respective list and passes it into a separate instance of scrape_channel
.def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) )
import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
api_key
, url
), we have a couple other parameters we need to get the right response from TikTok. We pass an arbitrary number into wait
and Proxy Aggregator will wait that much time for the page to render. We also need to set json_response
to True
.This is imperative so we can freeze the page. When we receive our JSON response, we pass the body
into BeautifulSoup
so we can parse the page non-interactively.Here is the function that makes everything work.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "json_response": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "json_response": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
main
. Feel free to change any of the following to customize your results.MAX_RETRIES
MAX_THREADS
LOCATION
channel_list
if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
wait
of 2 seconds per page. This level of speed is almost unfathomable!scrape_channel_content()
, our new parser. Like we did earlier, this function is built to take a JSON response from Proxy Aggregator in order to freeze the page and prevent it from interacting with the browser. First, we find our main_content
. From there, we gather all of our links
.All links must contain the profile name and TikTok's base domain. If they don't they're likely ads or sponsored posts.def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_page = html.unescape(page["body"]) soup = BeautifulSoup(decoded_page, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href or "https://www.tiktok.com" not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text name = href.split("/")[-1] try: int(name) except: logger.info(f"Ad found, skipping item: {name}") continue video_data = { "name": href.split("/")[-1], "url": href, "views": views } print(video_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}")
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
pulls our JSON response from the page containing the API response.json.loads()
.body
from the response and load it into BeautifulSoup for static processing. We cannot load it into the browser. We will get redirected.main_content.find_all("a")
finds all of our links
. We filter out any bad links that don't contain the proper information.views
gets a default value of 0. link.select_one("strong[data-e2e='video-views']")
checks to see if the video has views. If it does, we save the text
of this element to views
.name
, the id number of the video, gets extracted from the href
of the link.process_results()
, it does exactly that.With process_results()
, we read the CSV file into an array of dict
objects. We then iterate through them with a for
loop and call scrape_channel_content()
on each row. Later on, we'll replace the for
loop and add concurrency soon.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_channel_content(row, location, retries)
import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "json_response": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_page = html.unescape(page["body"]) soup = BeautifulSoup(decoded_page, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href or "https://www.tiktok.com" not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text name = href.split("/")[-1] try: int(name) except: logger.info(f"Ad found, skipping item: {name}") continue video_data = { "name": href.split("/")[-1], "url": href, "views": views } print(video_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_channel_content(row, location, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
DataPipeline
. We just need to feed it some dataclass
objects. To do this, we'll write one more class. It will be similar to our ProfileData
class from earlier. It will have the same methods, but our fields will be a bit different.Take a look at VideoData
. We use it to hold the name
, url
, and views
for each video we scrape.@dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
from inside scrape_channel_content()
. We feed our VideoData
into it until we're done scraping. Once we've completed the parse, we close the pipeline.import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "json_response": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url) ## Extract Data video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_page = html.unescape(page["body"]) soup = BeautifulSoup(decoded_page, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href or "https://www.tiktok.com" not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text name = href.split("/")[-1] try: int(name) except: logger.info(f"Ad found, skipping item: {name}") continue video_data = VideoData( name=href.split("/")[-1], url=href, views=views ) video_pipeline.add_data(video_data) success = True video_pipeline.close_pipeline() except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: scrape_channel_content(row, location, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
for
loop in our trigger function. ThreadPoolExecutor
is an excellent tool for this.Our rewritten function is in the snippet below. Our first arg is the function we wish to call, scrape_channel_content
. Next, we pass in our CSV file data with reader
. All other args get passed in as arrays the length of reader
.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel_content, reader, [location] * len(reader), [retries] * len(reader) )
driver.get()
line from the parser.scrapeops_proxy_url = get_scrapeops_url(url, location=location)driver.get(scrapeops_proxy_url)
import osimport csvimport jsonimport loggingimport htmlfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom bs4 import BeautifulSoupfrom urllib.parse import urlencodeimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "json_response": True, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass ProfileData: name: str = "" follower_count: int = 0 likes: int = 0 video_count: int = 0 nickname: str = "" verified: bool = False signature: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass VideoData: name: str = "" url: str = "" views: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_channel(channel_name, location, data_pipeline=None, retries=3): url = f"https://www.tiktok.com/@{channel_name}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_chunk = html.unescape(page["body"]) soup = BeautifulSoup(decoded_chunk, "html.parser") script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']") json_data = json.loads(script_tag.text) user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"] stats = user_info["stats"] follower_count = stats["followerCount"] likes = stats["heartCount"] video_count = stats["videoCount"] user_data = user_info["user"] unique_id = user_data["uniqueId"] nickname = user_data["nickname"] verified = user_data["verified"] signature = user_data["signature"] profile_data = ProfileData( name=unique_id, follower_count=follower_count, likes=likes, video_count=video_count, nickname=nickname, verified=verified, signature=signature ) data_pipeline.add_data(profile_data) success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel, channel_list, [location] * len(channel_list), [data_pipeline] * len(channel_list), [retries] * len(channel_list) ) def scrape_channel_content(row, location, retries): url = f"https://www.tiktok.com/@{row['name']}" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) driver.get(scrapeops_proxy_url) ## Extract Data video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv") json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML") page = json.loads(json_stuff) decoded_page = html.unescape(page["body"]) soup = BeautifulSoup(decoded_page, "html.parser") main_content = soup.select_one("div[id='main-content-others_homepage']") links = main_content.find_all("a") for link in links: href = link.get("href") if row["name"] not in href or "https://www.tiktok.com" not in href: continue views = 0 views_present = link.select_one("strong[data-e2e='video-views']") if views_present: views = views_present.text name = href.split("/")[-1] try: int(name) except: logger.info(f"Ad found, skipping item: {name}") continue video_data = VideoData( name=href.split("/")[-1], url=href, views=views ) video_pipeline.add_data(video_data) success = True video_pipeline.close_pipeline() except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_channel_content, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")
MAX_RETRIES
, MAX_THREADS
, LOCATION
, and channel_list
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "uk" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape channel_list = [ "paranormalpodcast", "theparanormalfiles", "jdparanormal", "paranormal.com7", "paranormal064", "marijoparanormal", "paranormal_activityghost", "youtube_paranormal" ] ## Job Processes crawl_pipeline = DataPipeline(csv_filename="channels.csv") start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting content scrape...") process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Content scrape complete")