How to Scrape TikTok With Selenium and BeautifulSoup
TikTok is one of the most notoriously difficult sites to scrape. They employ a sophisticated system of dynamic content and redirects to make it almost impossible to scrape... even with a proxy! While it might sound impossible, you actually can scrape TikTok.
In today's tutorial, we're going to crawl TikTok profile data. Then, we'll scrape the videos from those channels.
- TLDR: How To Scrape TikTok
- How To Architect Our Scraper
- Understanding What To Scrape
- Setting Up
- Building Our Crawler
- Building Our Scraper
- Legal and Ethical Considerations
- Conclusion
- More Python Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape TikTok
If you need to scrape TikTok, look no further. This code contains both a profile crawler and a video scraper.
- Create a new project folder.
- Inside that folder, add your ScrapeOps API Key to a
config.json
file. - Then you just need to paste the code below into a new Python file.
import os
import csv
import json
import logging
import html
from selenium import webdriver
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
OPTIONS.add_argument("--disable-javascript")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"json_response": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class VideoData:
name: str = ""
url: str = ""
views: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
## Extract Data
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_chunk = html.unescape(page["body"])
soup = BeautifulSoup(decoded_chunk, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")
json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]
follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]
user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]
profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)
data_pipeline.add_data(profile_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)
def scrape_channel_content(row, location, retries):
url = f"https://www.tiktok.com/@{row['name']}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
## Extract Data
video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_page = html.unescape(page["body"])
soup = BeautifulSoup(decoded_page, "html.parser")
main_content = soup.select_one("div[id='main-content-others_homepage']")
links = main_content.find_all("a")
for link in links:
href = link.get("href")
if row["name"] not in href or "https://www.tiktok.com" not in href:
continue
views = 0
views_present = link.select_one("strong[data-e2e='video-views']")
if views_present:
views = views_present.text
name = href.split("/")[-1]
try:
int(name)
except:
logger.info(f"Ad found, skipping item: {name}")
continue
video_data = VideoData(
name=href.split("/")[-1],
url=href,
views=views
)
video_pipeline.add_data(video_data)
success = True
video_pipeline.close_pipeline()
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel_content,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal"
]
## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
logger.info("Starting content scrape...")
process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Content scrape complete")
Feel free to change any of the following from main
:
MAX_RETRIES
: Defines the maximum number of times the script will attempt to scrape a URL if an error occurs during the process.MAX_THREADS
: Specifies the maximum number of concurrent threads that can run at the same time during the scraping process.LOCATION
: Defines the geographical location to simulate while sending requests to the target website.channel_info
: A list of TikTok channel usernames that the script is set to scrape.
How To Architect Our TikTok Scraper
Our TikTok scraper follows the same high level architecture as most of the other projects we've done in this series. We'll create both a crawler and a scraper. Our crawler will find general information on a list of channels.
Afterward, we'll learn how to scrape aggregate video data from each channel. Through a series of iterations, we'll add the following features to each component.
- Parsing: The ability to dig through the HTML and extract our target data.
- Data Storage: Once our data's been extracted, it needs to be saved to a CSV file.
- Concurrency: We should be able to run the steps above on multiple pages concurrently. This makes our runtime exponentially more efficient.
- Proxy Integration: To prevent from getting blocked, a decent proxy connection is imperative.
Understanding How To Scrape TikTok Pages
Step 1: How To Request TikTok Pages
Most TikTok data (including their search page) is only available if you're logged in. To get around this, we need to know our channel names.
The most important part of the screenshot below is the URL:
https://www.tiktok.com/@paranormalpodcast
All of our URLs are laid out like this:
https://www.tiktok.com/@{name_of_channel}
Step 2: How To Extract Data From TikTok Pages
TikTok renders pretty much everything dynamically. In order to load the channel, it uses a script
(JavaScript) element packed with all the necessary data to load the channel page.
If you look in the screenshot below, you can see evidence of this. You might notice the id
: __UNIVERSAL_DATA_FOR_REHYDRATION__
.
With this unique information, we can write a CSS selector to find this tag on the page: script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']
.
All of our videos on the page are embedded inside a
elements. The CSS here is jumbled nonsense. To extract these, we'll have to get a bit more creative than just a CSS selector.
If you're able to see below, the href
of the element seems to have a pretty uniform structure. These videos are laid out like this:
https://www.tiktok.com/@paranormalpodcast/video/7423989765457857798
We'll have to use these href
values to filter our links:
https://www.tiktok.com/@{name_of_channel}/video/{id_number}
Step 3: Geolocated Data
With geotargeting, we get to choose which country our requests come from. Proxy Aggregator gives us an effective proxy solution and we don't have to worry about managing pools or individual proxy connections.
Proxy Aggregator takes in a country
parameter. We simply need to pass a country code into this param. Proxy Aggregator reads the country code and routes the request through our desired country.
If we wish to show up in the US, our country code would be us
.
Country | Country Code |
---|---|
Brazil | br |
Canada | ca |
China | cn |
India | in |
Italy | it |
Japan | jp |
France | fr |
Germany | de |
Russia | ru |
Spain | es |
United States | us |
United Kingdom | uk |
For more information about Proxy Aggregator's geotargeting abilities, you can view the docs here.
Setting Up Our TikTok Scraper Project
Now, we need to create our project. Follow the steps below to create a new project folder, install Selenium, and get your project ready for coding.
Create a new project folder and cd
into the folder.
mkdir tiktok-selenium
cd tiktok-selenium
Create a virtual environment.
python -m venv venv
Activate the environment.
source venv/bin/activate
Install Selenium.
pip install selenium
**Make sure you have webdriver installed. You can find the latest version here.
Build A TikTok Search Crawler
Due to its site complexity, TikTok crawling requires more manual intervention than other crawlers we've built.
In order to scrape a TikTok page, we need to know its channel name. You can follow along with our code, but when you're performing your own crawl, you'll need to generate a list of channels beforehand.
Follow these steps and you'll be able to crawl any list of TikTok channels.
Step 1: Create Simple Search Data Parser
This is unconventional, but our crawler is actually going to use both BeautifulSoup and Selenium. Because of TikTok's complex redirect system, it's pretty much impossible to directly open the page from inside Selenium.
To account for this, we'll use Proxy Aggregator to fetch the page and actually take our response as JSON. The proxy integration will happen later on in the tutorial, so don't expect the code to work 100% just yet.
In the code below, we have the basic skeleton for our project. We have a few different functions. scrape_channel()
performs our actual scraping logic. start_scrape()
is used to trigger a scrape. Our runtime is held inside the main
block.
import os
import csv
import json
import logging
import html
from selenium import webdriver
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
OPTIONS.add_argument("--disable-javascript")
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_channel(channel_name, location, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(url)
## Extract Data
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_chunk = html.unescape(page["body"])
soup = BeautifulSoup(decoded_chunk, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")
json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]
follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]
user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]
profile_data = {
"name": unique_id,
"follower_count": follower_count,
"likes": likes,
"video_count": video_count,
"nickname": nickname,
"verified": verified,
"signature": signature
}
print(profile_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(channel_list, location, data_pipeline=None, retries=3):
for channel in channel_list:
scrape_channel(channel, location, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal"
]
## Job Processes
start_scrape(channel_list, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
Our parsing function here is unconventional.
driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
finds our JSON respoonse from Proxy Aggregator. This will not actually work until we've hooked up the proxy. We take our response as JSON to prevent the page from being altered and rendered. If Selenium renders the page, we get an automatic redirect that skips over our target page.- Not only do we need to freeze the page in a JSON response, the page comes obscured and full of escape characters. To make our HTML readable, we use
html.unescape(page["body"])
. - Once the page data is finally readable, we actually pass it into
BeautifulSoup
for parsing. Selenium's extraction features are build to interact with the page, and in this case, we don't want to interact with it. We want to keep it intact. - After we've gotten past all these hurdles, we read the JSON from the page with
soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")
. - From there, we just need to index JSON to retrieve our data.
Channel data is very cleverly hidden on the page. If we're careful, we can preserve the data and load it into BeautifulSoup
for processing. This allows us to dig through the data without any page interactions that could potentially alter the page or redirect us.
Step 2: Storing the Scraped Data
Once we've extracted our channel data, we need to store it. In the example above, we loaded the data into a dict
. This is great when we're just trying to get things working, but these data structures don't cover edge cases.
For production use, we should use custom, strongly typed objects. We also need a way to pipe these objects into a CSV file.
This is our ProfileData
class. We'll use it to replace the dict
we used earlier. If a field is missing in this object, it gets replaced by a default value: No {field.name}
.
Especially when dealing with data used by all sorts of moving parts (people, crawlers, scrapers etc.), this is a much safer appproach to prevent corruption.
@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Next, we need a pipeline to a CSV file. The class below is our DataPipeline
. While its waiting to be saved, our data gets held in a storage_queue
. is_duplicate()
allows us to fileter out duplicate items from the pipeline. When we close our pipeline, we sleep
for 3 seconds to wait for any other operations to complete, then it saves the storage_queue
to a CSV file.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
In our full code, instead of printing a dict
to the console, we create a ProfileData
object. Then, we pass it into the DataPipeline
for safe and effective storage.
import os
import csv
import json
import logging
import html
from selenium import webdriver
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
OPTIONS.add_argument("--disable-javascript")
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(url)
## Extract Data
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_chunk = html.unescape(page["body"])
soup = BeautifulSoup(decoded_chunk, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")
json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]
follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]
user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]
profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)
data_pipeline.add_data(profile_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(channel_list, location, data_pipeline=None, retries=3):
for channel in channel_list:
scrape_channel(channel, location, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal"
]
## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
Step 3: Adding Concurrency
Our trigger function, start_scrape()
already takes in a list of channels to scrape them. However, we iterate through them using a for
loop. This isn't the most efficient process because our crawler needs to look them up one at a time.
With ThreadPoolExecutor
, we can parse multiple channels at the same time.
Here is our rewritten trigger function. The real magic comes from executor.map()
. Take a look at the args we pass into it.
scrape_channel
: The function we want to call on each thread.channel_list
: This is the list of channels we actually wish to crawl.- All other args get passed in as arrays. The arrays need to be the same length as our
channel_list
.executor.map()
then takes each arg from its respective list and passes it into a separate instance ofscrape_channel
.
def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)
Here is our fully updated code.
import os
import csv
import json
import logging
import html
from selenium import webdriver
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
OPTIONS.add_argument("--disable-javascript")
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(url)
## Extract Data
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_chunk = html.unescape(page["body"])
soup = BeautifulSoup(decoded_chunk, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")
json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]
follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]
user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]
profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)
data_pipeline.add_data(profile_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal"
]
## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
Step 4: Bypassing Anti-Bots
As we mentioned before, we need Proxy Aggregator here. Proxy Aggregator uses a REST API. We need a function that takes in all of our parameters and returns a proxied URL.
Along with our regular requirements (api_key
, url
), we have a couple other parameters we need to get the right response from TikTok. We pass an arbitrary number into wait
and Proxy Aggregator will wait that much time for the page to render. We also need to set json_response
to True
.
This is imperative so we can freeze the page. When we receive our JSON response, we pass the body
into BeautifulSoup
so we can parse the page non-interactively.
Here is the function that makes everything work.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"json_response": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
Our full code for the finalized crawler is available below.
import os
import csv
import json
import logging
import html
from selenium import webdriver
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
OPTIONS.add_argument("--disable-javascript")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"json_response": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
## Extract Data
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_chunk = html.unescape(page["body"])
soup = BeautifulSoup(decoded_chunk, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")
json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]
follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]
user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]
profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)
data_pipeline.add_data(profile_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal"
]
## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
Step 5: Production Run
Take a look at our main
. Feel free to change any of the following to customize your results.
MAX_RETRIES
MAX_THREADS
LOCATION
channel_list
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal"
]
## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
We crawled 8 pages in 14.646 seconds! This comes out to 1.83075 seconds per page. We have a wait
of 2 seconds per page. This level of speed is almost unfathomable!
Build A TikTok Video Scraper
Now that we've built a crawler to gather data for a list of profiles, we need to gather data for the videos from each of these profiles.
We need to know which videos perform the best. We'll record each video's id number, URL and its view count.
While this is a rather small amount of data for one particular video, when you amass this data for a large set of videos, it becomes much easier to analyze this dataset.
Step 1: Create Simple TikTok Video Data Parser
Once again, we need to start with a simple parser. The goal of this parser is simple. We need to collect the id number, URL, and view count of each video for the given profiles.
The code below is much like our first parser. It won't work until we add proxy support and we once again need to get the page in a JSON response so we can prevent Selenium from rendering the page.
Take a look at scrape_channel_content()
, our new parser. Like we did earlier, this function is built to take a JSON response from Proxy Aggregator in order to freeze the page and prevent it from interacting with the browser. First, we find our main_content
. From there, we gather all of our links
.
All links must contain the profile name and TikTok's base domain. If they don't they're likely ads or sponsored posts.
def scrape_channel_content(row, location, retries):
url = f"https://www.tiktok.com/@{row['name']}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(url)
## Extract Data
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_page = html.unescape(page["body"])
soup = BeautifulSoup(decoded_page, "html.parser")
main_content = soup.select_one("div[id='main-content-others_homepage']")
links = main_content.find_all("a")
for link in links:
href = link.get("href")
if row["name"] not in href or "https://www.tiktok.com" not in href:
continue
views = 0
views_present = link.select_one("strong[data-e2e='video-views']")
if views_present:
views = views_present.text
name = href.split("/")[-1]
try:
int(name)
except:
logger.info(f"Ad found, skipping item: {name}")
continue
video_data = {
"name": href.split("/")[-1],
"url": href,
"views": views
}
print(video_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
pulls our JSON response from the page containing the API response.- We then load the response using
json.loads()
. - We pull the
body
from the response and load it into BeautifulSoup for static processing. We cannot load it into the browser. We will get redirected. main_content.find_all("a")
finds all of ourlinks
. We filter out any bad links that don't contain the proper information.views
gets a default value of 0.link.select_one("strong[data-e2e='video-views']")
checks to see if the video has views. If it does, we save thetext
of this element toviews
.- Our
name
, the id number of the video, gets extracted from thehref
of the link.
Step 2: Loading URLs To Scrape
We'll create our URL by simply looking up the profile. We pull these profiles from our initial report get their pages. We need another trigger function to read our CSV file and run our parser on each row of the file. Take a look at process_results()
, it does exactly that.
With process_results()
, we read the CSV file into an array of dict
objects. We then iterate through them with a for
loop and call scrape_channel_content()
on each row. Later on, we'll replace the for
loop and add concurrency soon.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
scrape_channel_content(row, location, retries)
Our full code now looks like this.
import os
import csv
import json
import logging
import html
from selenium import webdriver
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
OPTIONS.add_argument("--disable-javascript")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"json_response": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
## Extract Data
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_chunk = html.unescape(page["body"])
soup = BeautifulSoup(decoded_chunk, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")
json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]
follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]
user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]
profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)
data_pipeline.add_data(profile_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)
def scrape_channel_content(row, location, retries):
url = f"https://www.tiktok.com/@{row['name']}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(url)
## Extract Data
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_page = html.unescape(page["body"])
soup = BeautifulSoup(decoded_page, "html.parser")
main_content = soup.select_one("div[id='main-content-others_homepage']")
links = main_content.find_all("a")
for link in links:
href = link.get("href")
if row["name"] not in href or "https://www.tiktok.com" not in href:
continue
views = 0
views_present = link.select_one("strong[data-e2e='video-views']")
if views_present:
views = views_present.text
name = href.split("/")[-1]
try:
int(name)
except:
logger.info(f"Ad found, skipping item: {name}")
continue
video_data = {
"name": href.split("/")[-1],
"url": href,
"views": views
}
print(video_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
scrape_channel_content(row, location, retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal"
]
## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
logger.info("Starting content scrape...")
process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Content scrape complete")
Step 3: Storing the Scraped Data
We've already got a powerful DataPipeline
. We just need to feed it some dataclass
objects. To do this, we'll write one more class. It will be similar to our ProfileData
class from earlier. It will have the same methods, but our fields will be a bit different.
Take a look at VideoData
. We use it to hold the name
, url
, and views
for each video we scrape.
@dataclass
class VideoData:
name: str = ""
url: str = ""
views: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
In our updated example, we now open a DataPipeline
from inside scrape_channel_content()
. We feed our VideoData
into it until we're done scraping. Once we've completed the parse, we close the pipeline.
import os
import csv
import json
import logging
import html
from selenium import webdriver
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
OPTIONS.add_argument("--disable-javascript")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"json_response": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class VideoData:
name: str = ""
url: str = ""
views: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
## Extract Data
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_chunk = html.unescape(page["body"])
soup = BeautifulSoup(decoded_chunk, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")
json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]
follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]
user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]
profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)
data_pipeline.add_data(profile_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)
def scrape_channel_content(row, location, retries):
url = f"https://www.tiktok.com/@{row['name']}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
driver.get(url)
## Extract Data
video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_page = html.unescape(page["body"])
soup = BeautifulSoup(decoded_page, "html.parser")
main_content = soup.select_one("div[id='main-content-others_homepage']")
links = main_content.find_all("a")
for link in links:
href = link.get("href")
if row["name"] not in href or "https://www.tiktok.com" not in href:
continue
views = 0
views_present = link.select_one("strong[data-e2e='video-views']")
if views_present:
views = views_present.text
name = href.split("/")[-1]
try:
int(name)
except:
logger.info(f"Ad found, skipping item: {name}")
continue
video_data = VideoData(
name=href.split("/")[-1],
url=href,
views=views
)
video_pipeline.add_data(video_data)
success = True
video_pipeline.close_pipeline()
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
scrape_channel_content(row, location, retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal"
]
## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
logger.info("Starting content scrape...")
process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Content scrape complete")
Step 4: Adding Concurrency
Now, we're going to replace the for
loop in our trigger function. ThreadPoolExecutor
is an excellent tool for this.
Our rewritten function is in the snippet below. Our first arg is the function we wish to call, scrape_channel_content
. Next, we pass in our CSV file data with reader
. All other args get passed in as arrays the length of reader
.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel_content,
reader,
[location] * len(reader),
[retries] * len(reader)
)
Step 5: Bypassing Anti-Bots
We've got one final change to make. We need to hook this new scraper up to Proxy Aggregator. We already have our proxy function, we just need to use it in the right place. We'll change the driver.get()
line from the parser.
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
Our final code is available for you to see below.
import os
import csv
import json
import logging
import html
from selenium import webdriver
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
from urllib.parse import urlencode
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
OPTIONS = webdriver.ChromeOptions()
OPTIONS.add_argument("--headless")
OPTIONS.add_argument("--disable-javascript")
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"json_response": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class VideoData:
name: str = ""
url: str = ""
views: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
## Extract Data
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_chunk = html.unescape(page["body"])
soup = BeautifulSoup(decoded_chunk, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")
json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]
follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]
user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]
profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)
data_pipeline.add_data(profile_data)
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)
def scrape_channel_content(row, location, retries):
url = f"https://www.tiktok.com/@{row['name']}"
tries = 0
success = False
while tries <= retries and not success:
try:
driver = webdriver.Chrome(options=OPTIONS)
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
driver.get(scrapeops_proxy_url)
## Extract Data
video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
json_stuff = driver.find_element(By.CSS_SELECTOR, "pre").get_attribute("innerHTML")
page = json.loads(json_stuff)
decoded_page = html.unescape(page["body"])
soup = BeautifulSoup(decoded_page, "html.parser")
main_content = soup.select_one("div[id='main-content-others_homepage']")
links = main_content.find_all("a")
for link in links:
href = link.get("href")
if row["name"] not in href or "https://www.tiktok.com" not in href:
continue
views = 0
views_present = link.select_one("strong[data-e2e='video-views']")
if views_present:
views = views_present.text
name = href.split("/")[-1]
try:
int(name)
except:
logger.info(f"Ad found, skipping item: {name}")
continue
video_data = VideoData(
name=href.split("/")[-1],
url=href,
views=views
)
video_pipeline.add_data(video_data)
success = True
video_pipeline.close_pipeline()
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
finally:
driver.quit()
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel_content,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal"
]
## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
logger.info("Starting content scrape...")
process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Content scrape complete")
Step 6: Production Run
Now, we're going to run a full crawl and scrape. As always, feel free to change the config variables: MAX_RETRIES
, MAX_THREADS
, LOCATION
, and channel_list
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal"
]
## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
logger.info("Starting content scrape...")
process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Content scrape complete")
Here are the final results.
If you remember earlier, our crawl took 14.646 seconds. This time, the full run took 29.796 seconds. 29.796 - 14.646 = 15.15 seconds scraping videos. 15.15 / 8 pages = 1.89375 seconds per page.
Just like our crawler example, this is incredibly fast.
Legal and Ethical Considerations
When scraping, we need to be both conscious and cautious about what we're doing. Don't scrape private data. It's best practice to only scrape public data and that's exactly what we did today. When you're scraping public data, it's generally legal. It's no different than taking a picture of a billboard.
Private data (data behind a login) is a completely different story and there can be disastrous consuquences if you're not careful about it.
Legal
Breaking the law when scraping can lead to any of the following:
- Cease and Desist Letters: When a company formally asks you to stop scraping their site.
- Lawsuits: Nobody likes going to court. If you collect data illegally, you can be liable for civil damages and more.
- Prison Time: If you scrape people's private data, you'd better be prepared to face a slew of consuquences. This is a serious crime in most countries punishable by real prison time.
Ethical
- Reputation Damage: No one wants to be in the next headline about unethical business practices. This can seriously damage your personal reputation and that of your company.
- Lawsuits and Suspensions: When you agree to a site's terms, you're signing a legally binding contract. If you violate this contract, you can lose your account or even be subject to a lawsuit.
If you are unsure of your scraper's legality, please consult an attorney.
You can view TikTok's policies using the links below.
Conclusion
Now you know how to scrape both TikTok profile data and aggergate video data from each profile.
You know how to use the ScrapeOps API to avoid not only antibots, but nasty redirects as well. Take your new knowledge of parsing, data storage, concurrency, and proxy integration to build something great. These are valuable skills. Use them to your advantage.
If you're interested in the tech stack from this article, check out the links below.
More Python Web Scraping Guides
You gained valuable insight from this article. If you're looking for others like this, take a look at our Selenium Web Scraping Playbook.
Now matter how long you've been scraping the web, we have something for you here at ScrapeOps.
If you'd like more from our "How To Scrape" series, take a look at the articles below.