How to Scrape Quora With Requests and BeautifulSoup
Quora is a question and answer website that is home to a wealth of information. Since its founding in 2009, Quora has been a great place to ask a question and get an answer. This data could be used to understand consumer behavior, identify pain points, and discover new product opportunities.
In this tutorial, we'll learn how to scrape Quora using Python Requests and BeautifulSoup.
- TLDR How to Scrape Quora
- How To Architect Our Scraper
- Understanding How To Scrape Quora
- Setting Up Our Quora Scraper
- Build A Quora Search Crawler
- Build A Quora Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
The full code for this Quora Scraper is available on Github here.
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Quora
Need to scrape Quora but don't have time to code? Use the scraper below!
Create a new project folder and add a config.json
file with your "api_key"
. After you're done, add a file with the code below. Feel free to change any of the constants inside the main
to tweak your results.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReplyData:
name: str = ""
reply: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div span")
for div_card in div_cards:
name = div_card.find("h3")
link = div_card.find("a")
if not name or not link:
continue
result_number += 1
ranking = result_number
search_data = SearchData(
name=name.text,
url=link.get("href"),
rank=ranking
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
def process_post(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
main_content = soup.select_one("div[id='mainContent']")
answer_cards = main_content.select("div div div div div[class='q-box']")
answer_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
last_seen_name = ""
for answer_card in answer_cards:
excluded_words = ["All related", "Recommended"]
array = answer_card.text.split("·")
if len(array) < 3:
continue
promoted = "Promoted" in array[0]
related = "Related" in array[2][0:30] or "Related" in array[-2][0:30]
repeat_name = array[0] in last_seen_name or array[0] == last_seen_name
if promoted or related or repeat_name:
last_seen_name = array[0]
continue
reply_data = ReplyData(
name=array[0],
reply=array[-2]
)
answer_pipeline.add_data(reply_data)
answer_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_post,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Feel free to change any of the following constants:
MAX_THREADS
: Controls the number of threads that the program will use for concurrent execution.MAX_RETRIES
: Defines the number of times the scraper will retry a failed request before giving up.PAGES
: Determines how many pages of Google search results to scrape for each keyword.LOCATION
: Specifies the geographical location (country) for the Google search.keyword_list
: This is a list of keywords for which the script will perform the search and subsequent scraping.
How To Architect Our Quora Scraper
This project consists of two scrapers, a crawler, and a scraper. The job of the crawler is to perform a search on a certain topic, and the scraper will scrape individual posts from that search.
Our crawler needs to perform the following actions:
- Search Quora for a specific topic and parse the results.
- Use pagination to control our results.
- Save the extracted data with proper data storage.
- Concurrently search multiple pages of results at once.
- Use proxy integration to get past anti-bots any other potential roadblocks that may be in our way.
After the crawl, our scraper will execute these actions:
- Read the CSV from the crawl.
- Lookup each individual post from the CSV and parse the results.
- Store the relevant data from the step above.
- Perform the two steps above concurrently on multiple posts.
- Integrate with a proxy once again to avoid anti-bots and anything else that might get in our way.
Understanding How To Scrape Quora
Scraping Quora is not our typical project. Take a look at what happens when you try to search Quora without an account. You are immediately blocked and asked to login.
As you saw above, we cannot actually perform a Quora search without logging in.
However, there is a way around this. First, we need to get a better understanding of this task at a high level.
As always, we're going to build both a crawler and a scraper.
- The crawler will scrape Quora posts from Google.
- The scraper will pull information about those individual topics directly from Quora. If you're not familiar with scraping Google results, we've got an article on that here.
Throughout this project, we'll make use of:
- Parsing
- Pagination
- Data Storage
- Concurrency
- Proxy Integration
Step 1: How To Request Quora Pages
To search Quora without an account, we're going to use Google. When you lookup Quora on Google, you are actually given an option to search Quora directly through Google.
Take a look at the screenshot below.
Here are the actual results from the search. If you look at the URL, this is something we can use to construct urls programmatically:
https://www.google.com/search?q=learn%20rust%20site%3Aquora.com
When searching programmatically, it would look like this
https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com
To request individual posts, we'll be extracting links directly from the search results above. If you click on the first result, you get this URL:
https://www.quora.com/How-do-I-learn-the-Rust-programming-language
Even though we won't need to build these urls manually, you should understand that they're laid out like this:
https://www.quora.com/Name-of-your-post
This is what the posts look like in your actual browser.
Step 2: How To Extract Data From Quora Results and Pages
Now, let's get a better look at the data we'll be extracting. Our Google results are nested incredibly deeply within the page. Our Quora results aren't much easier to extract because the replies are nested just as deeply.
Here are the Google results.
Here is an example Quora response card. Each div
card has a class of q-box
.
Step 3: How To Control Pagination
To control pagination with Google, we'll use the following layout in our URL:
https://www.google.com/search?q={query}&start={page * 10}
When using Google, each result has a unique number and receive approximately 10 results per page.
- So page 0 would give us results 1 through 10.
- Page 2 gives us results 11 through 20 and so on and so forth.
Step 4: Geolocated Data
We don't need specific geolocation support for Quora, but we'll be using the ScrapeOps Proxy API which provides this for us anyway.
When you use the API, you can give a parameter, country
and you'll be routed through a server in the country of your choosing.
- If we want to appear in the US, we can pass
"country": "us"
. - To appear in the UK, we can pass
"country": "uk"
.
Setting Up Our Quora Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir quora-scraper
cd quora-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A Quora Search Crawler
As previously mentioned, we're going to use Google to crawl Quora. We'll add the following to our scraper in the next few sections:
- Parsing
- Pagination
- Data Storage
- Concurrency
- Proxy Integration
Step 1: Create Simple Search Data Parser
We'll start by creating a parsing function. The code below sets our basic structure for the rest of the project. We begin with some retry logic and basic error handling but you really need to pay attention to our parsing logic.
Here is the code we'll start with.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div span")
for div_card in div_cards:
name = div_card.find("h3")
link = div_card.find("a")
if not name or not link:
continue
result_number += 1
ranking = result_number
search_data = {
"name": name.text,
"url": link.get("href"),
"rank": ranking
}
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
When parsing this search data from Google, we:
- Find all of our result cards with
soup.select("div span")
- For each
div_card
:name = div_card.find("h3")
finds the post name on Quora.link = div_card.find("a")
finds the link to the post.
Step 2: Add Pagination
Adding pagination is a pretty simple task here. Google's results are all given a number and we get 10 results per page. Page 1 gives us results 1 through 10. Page 2 gives us results 11 through 20 and so on and so forth.
To add these results, we need to add a start
parameter to our URL.
With the start param added in, our URL now looks like this:
https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}
We'll also add a start_scrape()
function. This one is pretty simple at the moment. It iterates through our pages
and runs scrape_search_results()
on each page.
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
Take a look at our full code now.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scrape_search_results(keyword, location, page_number, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div span")
for div_card in div_cards:
name = div_card.find("h3")
link = div_card.find("a")
if not name or not link:
continue
result_number += 1
ranking = result_number
search_data = {
"name": name.text,
"url": link.get("href"),
"rank": ranking
}
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
Step 3: Storing the Scraped Data
Next, we need to store the data we've scraped. In order to do this, we need to add a couple classes. First, we'll add a SearchData
class and then we'll add a DataPipeline
class.
SearchData
is adataclass
that exists specifically to hold data we've scraped. Once we've transformed a result on the page intoSearchData
, we need to pass it into ourDataPipeline
.- The
DataPipeline
pipes this data to a CSV file and filters out our duplicates.
Here is our SearchData
. It holds a name
, url
, and rank
.
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is our DataPipeline
.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
Now, we need to add these pieces into our full code. You can see what this looks like below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div span")
for div_card in div_cards:
name = div_card.find("h3")
link = div_card.find("a")
if not name or not link:
continue
result_number += 1
ranking = result_number
search_data = SearchData(
name=name.text,
url=link.get("href"),
rank=ranking
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
In our main, we now create a DataPipeline
. This DataPipeline
then gets passed into start_scrape()
and then scrape_search_results()
. We then use the add_data()
method to add this data to our pipeline.
Step 4: Adding Concurrency
Concurrency is vital when you're scraping at scale. start_scrape()
already gives us the ability to run scrape_search_results()
on multiple pages, but we want to run it on multiple pages simultaneously. In order to accomplish this, we'll be using ThreadPoolExecutor
.
Here is our new start_scrape()
function.
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
scrape_search_results
is the first argument passed intoexecutor.map()
. This is the function we want to run on each available thread.- All other arguments get passed into
executor.map()
as arrays that then get passed in on the individual threads.
Here is our full code up to this point.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 2000
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code == 200:
success = True
else:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div span")
for div_card in div_cards:
name = div_card.find("h3")
link = div_card.find("a")
if not name or not link:
continue
result_number += 1
ranking = result_number
search_data = SearchData(
name=name.text,
url=link.get("href"),
rank=ranking
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
[keyword] * pages,
[location] * pages,
range(pages),
[data_pipeline] * pages,
[retries] * pages
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 1
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")