Skip to main content

Scrape Tiktok With Python Requests and BeautifulSoup

How to Scrape TikTok Requests and BeautifulSoup

TikTok is an extremely popular social media platform where users can create and share short videos, typically lasting anywhere from 15 seconds to a few minutes. Imagine a place where dance challenges, cooking hacks, and pets with more personality than most people all coexist in a chaotic, scrolling wonderland.

Today, we're going to learn how to scrape data from TikTok. We'll learn the following:


TLDR - How to Scrape TikTok

Looking to scrape TikTok channels? Look no further! The scraper below will scrape any channel you want. Follow the steps below to run it!

  1. Create a new folder with a config.json file in it (place your ScrapeOps API key inside this file). It should look similar to what you see below.
{
"api_key": "YOUR-SUPER-SECRET-API-KEY"
}
  1. Create a Python file inside that same folder and copy/paste this code into it.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class VideoData:
name: str = ""
url: str = ""
views: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")

json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]


follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]

user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]

profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)

data_pipeline.add_data(profile_data)

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)

def scrape_channel_content(row, location, retries):
url = f"https://www.tiktok.com/@{row['name']}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
soup = BeautifulSoup(response.text, "html.parser")

main_content = soup.select_one("div[id='main-content-others_homepage']")
links = main_content.find_all("a")

for link in links:
href = link.get("href")
if row["name"] not in href:
continue
views = 0
views_present = link.select_one("strong[data-e2e='video-views']")
if views_present:
views = views_present.text

video_data = VideoData(
name=href.split("/")[-1],
url=href,
views=views
)

video_pipeline.add_data(video_data)
success = True
video_pipeline.close_pipeline()


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel_content,
reader,
[location] * len(reader),
[retries] * len(reader)
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal",
"paranormal140",
"paranormal.51"
]

## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

logger.info("Starting content scrape...")

process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Content scrape complete")
  1. Replace the items in the channel_list with the channels you'd like to scrape.

  2. Run the Python file!

python name_of_your_python_file.py

Feel free to change any of the following constants from the main as well:

  • MAX_RETRIES: Defines the maximum number of times the script will attempt to retry scraping a particular TikTok channel or content if the initial request fails. Increase MAX_RETRIES if you want the script to be more persistent in trying to scrape a channel.
  • MAX_THREADS: Determines the number of threads that the script will use for concurrent processing. This means how many channels or content pages the script can scrape simultaneously. Increase MAX_THREADS to speed up the scraping process, especially if you have a large number of channels to scrape.
  • LOCATION: Specifies the geographical location from which the scraping requests should appear to originate. This is useful because TikTok content can vary depending on the user’s location due to regional restrictions or content preferences.

How To Architect Our TikTok Scraper

If you wish to search for a certain category on TikTok, you are immediately blocked and asked to create an account.

Take a look at the screenshot below and see for yourself.

TikTok Login Modal

Our TikTok scraper is actually going to be quite different from other scrapers we've done in the "How To Scrape" series. As you saw at above, TikTok is very strict about letting users view their data without logging in.

While we can perform a search manually, even this is extremely limited. We can, however, scrape individual channels. Because of these limitations and the fact that TikTok blocks their search page from unauthenticated users, we need to perform our initial search manually.

It's not all bad though. After we've manually gathered the channels we'd like to scrape, it's actually a relatively easy process. We'll also create a scraper that goes through and scrapes all the video content from a channel.

Our crawler and scraper will utilize the following design elements:

  1. Parsing to extract valuable data from TikTok different TikTok accounts.
  2. Data Storage to storage the data we've extracted.
  3. Concurrency to scrape multiple pages simultaneously.
  4. Proxy Integration to bypass anti-bots and anything else that might get in our way.

Understanding How To Scrape TikTok

Step 1: How To Request TikTok Pages

When lookup a channel on TikTok, you're performing a GET request. Take a look at the page below. Our URL is laid out like this:

https://www.tiktok.com/@paranormalpodcast

We can follow this format for each channel we'd like to scrape:

https://www.tiktok.com/@{name_of_channel}

As mentioned earlier, our channel names need to be extracted manually, but from there, it's gravy.

TikTok Channel Page


Step 2: How To Extract Data From TikTok Channels

Metadata from TikTok channels is embedded on the page within a script tag. Once we pull this element from the page, we can use Python's json module and index through it like any other dict object. Take a look below.

TikTok Channel HTML Inspection

The data we're looking for is a script element with an id of __UNIVERSAL_DATA_FOR_REHYDRATION__. This is the data that TikTok uses to start building the page and this is the data that we're going to scrape.


Step 3: Geolocated Data

The data we're scraping is not location based. However, when a website blocks you, they typically do it based on your IP address. Our scraper is going to be exponentially faster than a normal human and we need to take this into account.

We'll be using the ScrapeOps Proxy API to take care of this. Along with rotating our IP address, the ScrapeOps API will also route us through servers in whatever location we specify. We can do this using the country param.

  • If we pass "country": "us", ScrapeOps will route us through a server in the US.
  • If we pass "uk" in as our country, ScrapeOps will route us through the UK.

Setting Up Our TikTok Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir tiktok-scraper

cd tiktok-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A TikTok Channel Crawler

Step 1: Create Channel Data Parser

The core of our crawler is the parsing function. In this case, our parser will fetch a TikTok channel. After getting the channel, it pulls a script (JavaScript) element from the page. Embedded within this JavaScript is a JSON blob. The JSON blob holds all sorts of interesting information about the channel.

Along with some basic structure and retry logic, this script does exactly that. Take a look at the Python script below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def scrape_channel(channel_name, location, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")

json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]


follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]

user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]

profile_data = {
"name": unique_id,
"follower_count": follower_count,
"likes": likes,
"video_count": video_count,
"nickname": nickname,
"verified": verified,
"signature": signature
}

print(profile_data)

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



def start_scrape(channel_list, location, max_threads=5, retries=3):
for channel in channel_list:
scrape_channel(channel, location, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Scrape starting...")

## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal",
"paranormal140",
"paranormal.51"
]

## Job Processes
start_scrape(channel_list, LOCATION, retries=MAX_RETRIES)
logger.info(f"Scrape complete.")

While we still have tries left and the operation has not succeeded:

  • We find the relevant JavaScript with soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']").
  • json.loads(script_tag.text) converts text of the script object into a dict we can index from Python.
  • We pull all the relevant data out of the JSON:
    • name
    • follower_count
    • likes
    • video_count
    • nickname
    • verified
    • signature

Step 2: Storing the Scraped Data

After we parse our data, we need to store it. In this section, we're going to add two classes: ProfileData and DataPipeline.

  1. ProfileData is used specifically for holding information from the profiles we scrape.
  2. The DataPipeline object takes a dataclass (in this case ProfileData) and pipes it into a CSV file while removing duplicates.

Here is our ProfileData class.

@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is our DataPipeline.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

In our full, script, instead of printing the data, we'll now pass it into the pipeline.

  • The pipeline then takes in all this data, filters out any duplicates and pipes it to a CSV file.
  • If the file already exists, we append it.
  • If the file does not exist, we create it.

Here is our full code at this point.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")

json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]


follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]

user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]

profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)

data_pipeline.add_data(profile_data)

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



def start_scrape(channel_list, location, data_pipeline=None, retries=3):
for channel in channel_list:
scrape_channel(channel, location, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Scrape starting...")

## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal",
"paranormal140",
"paranormal.51"
]

## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Scrape complete.")

Step 3: Adding Concurrency

Before our code is production ready, we need to add concurrency. Here, we're going to use ThreadPoolExecutor to spawn scrape_channel() on multiple threads. This will greatly increase our speed and efficiency.

The code snippet below replaces our for loop and runs scrape_channel() with ThreadPoolExecutor.

def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)

Take a look at the arguments we pass into executor.map():

  • Our first argument, scrape_channel tells executor to run scrape_channel() on every available thread.
  • channel_list is the list of channels we want to pass into scrape_channel().
  • We then pass location, data_pipeline, and retries in as arrays to be passed to each individual thread.

Our full code now looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")

json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]


follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]

user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]

profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)

data_pipeline.add_data(profile_data)

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Scrape starting...")

## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal",
"paranormal140",
"paranormal.51"
]

## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Scrape complete.")

Step 4: Bypassing Anti-Bots

Anti-bots are an unending source of headache for web developers all over the world. Even though our scraper isn't malicious, anti-bots tend to see scrapers as malware.

In order to get around anti-bots (and anything else for that matter), we'll be using the ScrapeOps Proxy API with a residential proxy.

The snippet below holds the key to all of this.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

get_scrapeops_url() takes in a number of arguments and converts any url into a ScrapeOps proxied url. Here are the individual arguments.

  • "api_key": is your ScrapeOps API key.
  • "url": is the url that you'd like to scrape.
  • "country": is the location you'd like to be routed through.
  • "residential": is a boolean value. When we set residential to True, we're telling ScrapeOps that we want a residential IP address. Anti-bots are far less likely to block a residential IP than a data center IP.

You can view our production level code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True,
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")

json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]


follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]

user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]

profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)

data_pipeline.add_data(profile_data)

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Scrape starting...")

## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal",
"paranormal140",
"paranormal.51"
]

## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Scrape complete.")

Step 5: Production Run

Let's run this code in production and see how it does. If you need a refresher, here is our main. MAX_RETRIES is set to 3. MAX_THREADS is set to 5, and our location is set to "uk". Feel free to change any of these constants.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Scrape starting...")

## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal",
"paranormal140",
"paranormal.51"
]

## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Scrape complete.")

Here are our results.

Scraper Results in Terminal

In total, we scraped 10 channels in 14.235 seconds. This comes out to 1.4 seconds per page. This is lightning fast. It's not uncommon for a web scraper to take 7 to 10 seconds on a page!


Build A TikTok Scraper

In this section, we're going to scrape the video and photo posts from the individual channels we crawled earlier. We'll add a wait parameter to the ScrapeOps URL, and we'll pull some data out of some incredibly nested elements. This scraper needs to do the following:

  1. Read the CSV file.
  2. Parse each channel from the file.
  3. Store the data we parsed from each channel.
  4. Process multiple channels simultaneously using concurrency.
  5. Once again, integrate with the ScrapeOps Proxy API.

Step 1: Create Simple Content Data Parser

We'll start out with a simple parsing function. As usual, we'll start with basic error handling, retries and parsing. Take a look at our scrape_channel_content() function. It looks a lot like our first parsing function.

def scrape_channel_content(row, location, retries):
url = f"https://www.tiktok.com/@{row['name']}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

main_content = soup.select_one("div[id='main-content-others_homepage']")
links = main_content.find_all("a")

for link in links:
href = link.get("href")
if row["name"] not in href:
continue
views = 0
views_present = link.select_one("strong[data-e2e='video-views']")
if views_present:
views = views_present.text

video_data = {
"name": href.split("/")[-1],
"url": href,
"views": views
}

print(video_data)
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
  • First, we find the link to the video (or photo) with main_content.find_all("a").
  • Then, we filter out links that don't include the channel name.
  • We then pull the views, url, and name (id number) from the link element.

Step 2: Loading URLs To Scrape

In order to parse this data, we need to feed urls into our parsing function. To do this, we'll write a new function similar to start_scrape().

We'll call this one process_results(). This function will read our CSV into an array object. Then it will iterate through all the rows of the array and call scrape_channel_content() on them.

Here is process_results().

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
scrape_channel_content(row, location, retries=retries)

After putting it together, our code now looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())



class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")

json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]


follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]

user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]

profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)

data_pipeline.add_data(profile_data)

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)

def scrape_channel_content(row, location, retries):
url = f"https://www.tiktok.com/@{row['name']}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")

main_content = soup.select_one("div[id='main-content-others_homepage']")
links = main_content.find_all("a")

for link in links:
href = link.get("href")
if row["name"] not in href:
continue
views = 0
views_present = link.select_one("strong[data-e2e='video-views']")
if views_present:
views = views_present.text

video_data = {
"name": href.split("/")[-1],
"url": href,
"views": views
}

print(video_data)
success = True


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
scrape_channel_content(row, location, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal",
"paranormal140",
"paranormal.51"
]

## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

logger.info("Starting content scrape...")

process_results("channels.csv", LOCATION, retries=MAX_RETRIES)
logger.info("Content scrape complete")

process_results() reads our CSV file into an array. It then runs scrape_channel_content() on each row from the file.


Step 3: Storing the Scraped Data

We now need to store our scraped data. We've already got our DataPipeline, we just need a dataclass to pass into it. We'll call this one VideoData. This class will hold a the following:

  • name: the unique number given to the photo or video.
  • url: the url where we can find the photo or video.
  • views: the amount of views that the photo or video has received.

Here it is.

@dataclass
class VideoData:
name: str = ""
url: str = ""
views: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

In our updated code below, we create a new DataPipeline and pass VideoData objects into it.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class VideoData:
name: str = ""
url: str = ""
views: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")

json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]


follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]

user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]

profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)

data_pipeline.add_data(profile_data)

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)

def scrape_channel_content(row, location, retries):
url = f"https://www.tiktok.com/@{row['name']}"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
soup = BeautifulSoup(response.text, "html.parser")

main_content = soup.select_one("div[id='main-content-others_homepage']")
links = main_content.find_all("a")

for link in links:
href = link.get("href")
if row["name"] not in href:
continue
views = 0
views_present = link.select_one("strong[data-e2e='video-views']")
if views_present:
views = views_present.text

video_data = VideoData(
name=href.split("/")[-1],
url=href,
views=views
)

video_pipeline.add_data(video_data)
success = True
video_pipeline.close_pipeline()


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
scrape_channel_content(row, location, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal",
"paranormal140",
"paranormal.51"
]

## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

logger.info("Starting content scrape...")

process_results("channels.csv", LOCATION, retries=MAX_RETRIES)
logger.info("Content scrape complete")

Our new dataclass gives us almost everything we need to properly scrape the content from all of these channels.


Step 4: Adding Concurrency

We'll add concurrency exactly the way we did before. We'll refactor process_results() to take advantage of multithreading with ThreadPoolExecutor.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel_content,
reader,
[location] * len(reader),
[retries] * len(reader)
)

In our process_results() function, we now pass scrape_channel_content in as our first argument. Then we pass in the reader object (an array of dict objects we want to process). location and retries both get passed in as arrays as well.


Step 5: Bypassing Anti-Bots

To bypass anti-bots, we'll once again be using get_scrapeops_url(). Before we call it again in our code, we're going to add one more argument to it, "wait": 2000.

This will tell the ScrapeOps server to wait 2 seconds for content to render before sending it back to us. We need to do this so that the videos and photos from these channels can be fetched and loaded into the page.

Here is our finished proxy function.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url

After this, we need to change a small portion of our code.

scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)

You can view our full project ready to run in production below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"residential": True,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class ProfileData:
name: str = ""
follower_count: int = 0
likes: int = 0
video_count: int = 0
nickname: str = ""
verified: bool = False
signature: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class VideoData:
name: str = ""
url: str = ""
views: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()


def scrape_channel(channel_name, location, data_pipeline=None, retries=3):
url = f"https://www.tiktok.com/@{channel_name}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)

response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
script_tag = soup.select_one("script[id='__UNIVERSAL_DATA_FOR_REHYDRATION__']")

json_data = json.loads(script_tag.text)
user_info = json_data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
stats = user_info["stats"]


follower_count = stats["followerCount"]
likes = stats["heartCount"]
video_count = stats["videoCount"]

user_data = user_info["user"]
unique_id = user_data["uniqueId"]
nickname = user_data["nickname"]
verified = user_data["verified"]
signature = user_data["signature"]

profile_data = ProfileData(
name=unique_id,
follower_count=follower_count,
likes=likes,
video_count=video_count,
nickname=nickname,
verified=verified,
signature=signature
)

data_pipeline.add_data(profile_data)

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")



def start_scrape(channel_list, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel,
channel_list,
[location] * len(channel_list),
[data_pipeline] * len(channel_list),
[retries] * len(channel_list)
)

def scrape_channel_content(row, location, retries):
url = f"https://www.tiktok.com/@{row['name']}"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True

else:
raise Exception(f"Failed request, Status Code {response.status_code}")

## Extract Data

video_pipeline = DataPipeline(csv_filename=f"{row['name']}.csv")
soup = BeautifulSoup(response.text, "html.parser")

main_content = soup.select_one("div[id='main-content-others_homepage']")
links = main_content.find_all("a")

for link in links:
href = link.get("href")
if row["name"] not in href:
continue
views = 0
views_present = link.select_one("strong[data-e2e='video-views']")
if views_present:
views = views_present.text

video_data = VideoData(
name=href.split("/")[-1],
url=href,
views=views
)

video_pipeline.add_data(video_data)
success = True
video_pipeline.close_pipeline()


except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
tries+=1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_channel_content,
reader,
[location] * len(reader),
[retries] * len(reader)
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal",
"paranormal140",
"paranormal.51"
]

## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

logger.info("Starting content scrape...")

process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Content scrape complete")

Step 6: Production Run

Time to run this thing in production. If you need a refresher, here is the main we'll be working with. Feel free to change any of the following constants.

  • MAX_RETRIES
  • MAX_THREADS
  • LOCATION
  • channel_list
if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "uk"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
channel_list = [
"paranormalpodcast",
"theparanormalfiles",
"jdparanormal",
"paranormal.com7",
"paranormal064",
"marijoparanormal",
"paranormal_activityghost",
"youtube_paranormal",
"paranormal140",
"paranormal.51"
]

## Job Processes
crawl_pipeline = DataPipeline(csv_filename="channels.csv")
start_scrape(channel_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

logger.info("Starting content scrape...")

process_results("channels.csv", LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Content scrape complete")

Here are the results.

Scraper Results in Terminal

Since we added the 2 second wait, we'll add 20 seconds to the crawl from earlier (14.235 seconds) so we can estimate our crawl at 34.235 seconds. 106.536 - 34.235 = 72.301 seconds. 72.301 seconds / 10 channels = 7.2301 seconds per channel. If we adjust for our wait time from the crawl, it comes out to about 3.4 seconds per page. Our crawler relies on heavier parsing logic and it shows up in the benchmark.


Whenever you access a website, whether you're scraping it or looking at it from your browser, you are subject to their Terms of Service and their robots.txt.

You may view TikTok's terms here and their robots.txt is available here.

It's important to examine both of these files because violating them can get you blocked or even permanently banned.

If you are unsure of the legality of a scraping project, generally public data (data not behind a login) is public information and therefore fair game when scraping.

If your data is gated behind a login or some other type of authentication, this data is considered private property and you will be subject to individual privacy and intellectual property laws.

Any time you're not sure if your data is public or private, make sure to consult an attorney.


Conclusion

You did it! You now know how to use Requests and BeautifulSoup4. You also have a solid grasp of JSON. You have a decent understanding of parsing to extract your data, data storage to store it in a CSV file, concurrency to make everythig happen at once, and proxy integration to get past any roadblocks that might come your way.


More Python Web Scraping Guides

Here at ScrapeOps, we've got loads of learning materials for you to choose from. Whether you're just starting out, or you're a veteran dev, we have something for you.

Check out our extensive Python Web Scraping Playbook or read one of the articles below and add another piece to your scraping toolbox.