How to Scrape Quora With Selenium
Quora is a popular question-and-answer platform, housing valuable information across various topics. Since its launch in 2009, Quora has served as a go-to destination for people looking to ask questions and get insightful answers from a global community of experts. This data can be scraped and analyzed to understand trends, explore customer pain points, and gain insights into market opportunities.
In this tutorial, we will learn how to build a Quora scraper using Selenium.
- TLDR - How to Scrape Quora
- How To Architect Our Quora Scraper
- Understanding How To Scrape Quora
- Setting Up Our Quora Scraper Project
- Build A Quora Search Crawler
- Build A Quora Scraper
- Legal and Ethical Considerations
- Conclusion
- More Python Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR - How to Scrape Quora
Need to scrape Quora but don't have time to code? Use the scraper below!
To quickly scrape Quora without coding from scratch, follow the steps below:
-
Write or paste the code provided to initiate the scraper.
-
You can configure various parameters:
MAX_THREADS
: Controls the number of concurrent threads.MAX_RETRIES
: Defines the retry attempts for failed requests.PAGES
: Number of Google search result pages to scrape.LOCATION
: Set the geographical region for search results.keyword_list
: List of search terms to scrape.
-
Install the
virtualenv
and install the necessary libraries. -
After setting up, run the scraper, and data will be saved to CSV files.
import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Selenium configuration
# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary
# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)
# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
@dataclass
class ReplyData:
name: str = ""
reply: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
logger.info("saving file data")
logger.info(keys)
# Filter out invalid characters from the filename
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = (
os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
)
if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()
with open(
valid_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
except Exception as e:
logger.error(f"Error saving csv {e}")
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
logger.info("adding data")
logger.info(scraped_data)
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if (
len(self.storage_queue) >= self.storage_queue_limit
and not self.csv_file_open
):
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):
# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0
while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")
# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))
# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue
search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")
def start_scrape(
keyword, pages, data_pipeline=None, max_threads=5, retries=3
):
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for page in range(pages):
# No need to pass the driver anymore, each thread will create its own
futures.append(
executor.submit(
scrape_search_results,
keyword,
page,
data_pipeline,
retries,
)
)
# Ensure all threads complete
for future in futures:
future.result() # This blocks until the thread finishes
def process_post(row, retries=3):
with webdriver.Chrome(service=service, options=options) as driver:
logger.info(f"Processing row: {row}")
url = row.get("url")
if not url:
logger.error(f"No URL found in row: {row}")
return
logger.info(f"Processing URL: {url}")
success = False
tries = 0
while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")
# Use explicit wait to ensure main content is loaded
wait = WebDriverWait(driver, 10)
main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']")))
# Extract answer cards
answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper")
if not answer_cards:
logger.warning(f"No answer cards found at {url}")
# Initialize a new DataPipeline for replies
if 'name' not in row:
logger.error(f"'name' key missing in row: {row}")
break
answer_pipeline = DataPipeline(
csv_filename=f"{row['name'].replace(' ', '-')}.csv"
)
last_seen_name = ""
for answer_card in answer_cards:
try:
name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative")
name = name_element.text.replace("\n", "").strip()
reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content")
reply = reply_element.text.strip()
if "Sponsored" in name:
continue
if "Related questions" in name:
break
if name == last_seen_name:
continue
last_seen_name = name
reply_data = ReplyData(name=name, reply=reply)
answer_pipeline.add_data(reply_data)
except Exception as e:
continue
answer_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
def process_results(csv_file, max_threads=5, retries=3):
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
logger.info(f"file opened")
with ThreadPoolExecutor(max_workers=max_threads) as executor:
for row in reader:
executor.submit(process_post, row, retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
How To Architect Our Quora Scraper
Our project consists of two main components: a crawler and a scraper.
- Crawler: It searches Quora through Google, extracts posts, and saves relevant data.
- Scraper: It reads the saved data, visits individual Quora posts, and scrapes detailed content.
Our crawler needs to perform the following actions:
- Perform a search on Quora through Google.
- Parse and extract search results, including pagination.
- Save data (post titles and links) efficiently.
- Execute concurrent searches on multiple result pages.
After the crawl, our scraper will execute these actions:
- Read the saved data from the CSV.
- Visit each individual Quora post and extract relevant information.
- Store extracted data in a structured format.
- Use concurrency to speed up scraping multiple posts.
Understanding How To Scrape Quora
Scraping Quora is unique compared to scraping other websites due to its frequent usage of dynamic content, anti-bot mechanisms, and requiring users to be logged in for most interactions.
However, by leveraging Google search to find Quora posts and extracting the content using Selenium, we can bypass the need for an account and scrape publicly available Quora posts indirectly.
Here’s a breakdown of how we scrape Quora:
Step 1: How To Request Quora Pages
Directly scraping Quora pages can be difficult because accessing them usually requires logging in. To circumvent this, we scrape Quora indirectly by querying Google search results for Quora pages.
If we just search Quora, on google it gives us an option to search it through there without going to the site.
Here we query Google with the following structure to find relevant Quora pages:
https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com
Where {formatted_keyword}
is the term you're searching for on Quora.
For example, searching for "learn Rust" on Quora via Google would look like this:
https://www.google.com/search?q=learn+rust+site%3Aquora.com
This URL returns search results where the website is limited to quora.com
.
Step 2: How To Extract Data From Quora Results and Pages
Once you have retrieved the Google search results, you will need to extract URLs from the results and scrape the actual Quora pages for detailed answers.
Finding the Correct XPaths or CSS Selectors
You can use Chrome DevTools (right-click on a webpage, then click “Inspect”) to find the correct XPaths or CSS selectors for the elements you want to extract.
1. Find XPath for Quora Search Results:
- For each search result in Google, you’ll need to locate the post titles and URLs.
- To locate the post title in a Google search result, right-click on the title element in Chrome DevTools and select “CopXPath”.
- This might give you an XPath like:
//*[@id='rso']/div[1]/div/div[1]/a/h3
- Use similar techniques to extract the URL.
Screenshot 4
2. Find the Main Content and Replies on Quora:
Once you have the Quora post URLs, you’ll need to scrape the actual content and replies on each Quora page. In Quora posts, the answers are often deeply nested within div
tags.
Use the following steps to locate elements:
- Inspect the page, and right-click the area containing the answers.
- Copy the CSS selector or XPath of the answer's container. For example, Quora uses a class like
q-click-wrapper
for answers.
Screenshot 5
Step 3: How To Control Pagination:
When scraping multiple Google search result pages, you’ll need to control pagination by updating the search URL’s start
parameter.
For example:
- Page 1: Returns the first 10 results.
https://www.google.com/search?q=learn+rust+site%3Aquora.com&start=0
- Page 2: Returns the next 10 results.
https://www.google.com/search?q=learn+rust+site%3Aquora.com&start=10
By incrementing the start
value by 10, you can paginate through all the results.
Setting Up Our Quora Scraper Project
To get started with scraping Quora using Selenium and ScrapeOps, follow the steps below to set up the project environment, install dependencies, and configure your WebDriver.
Create a New Project Folder
mkdir quora-scraper
cd quora-scraper
Set Up a Virtual Environment
It's a good practice to isolate your project dependencies using a virtual environment:
python -m venv venv
source venv/bin/activate # On Windows use: venv\Scripts\activate
Install Dependencies
You'll need Selenium
for browser automation, WebDriverWait
for adding wait until an element appears on the browser and any other necessary libraries. Install them using pip
:
pip install selenium
pip install WebDriverWait
Download and Set Up ChromeDriver
Selenium requires a WebDriver to interact with the browser. For this project, we are using ChromeDriver.
-
Download ChromeDriver:
- Go to the ChromeDriver download page.
- Make sure to download the version that matches your installed version of Google Chrome.
-
Move ChromeDriver to Project Path:
- Once downloaded, place
chromedriver.exe
in your project folder or somewhere accessible in your system’sPATH
.
- Once downloaded, place
Configure ChromeDriver Path in Code
You’ll need to specify the path to the ChromeDriver in your Python code. Here’s how you can configure the CHROMEDRIVER_PATH
and set up the Service
:
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if chromedriver file is not in the current directory
With this setup complete, you’re ready to move on to the next step: building the Quora search crawler.
Build A Quora Search Crawler
Step 1: Create Simple Search Data Parser
Create a parser that extracts Quora post titles and links from Google search results. Use Selenium
to select the HTML elements (h3
for titles and a
for links).
import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Selenium configuration
# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary
# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)
# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)
def scrape_search_results(keyword, data_pipeline=None, retries=3):
# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = 0
logger.info(f"page {page_number}")
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0
while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")
# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))
# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue
search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, retries=MAX_RETRIES)
Step 2: Add Pagination
Modify the search URL to paginate through results by adjusting the start
parameter:
result_number = page_number * 10
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
After adding pagination the code would be:
import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Selenium configuration
# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary
# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)
# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)
def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):
# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0
while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")
# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))
# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue
search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")
def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, pages=PAGES, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
Once the data is extracted from Quora or the Google search results, it is essential to store it efficiently and avoid any duplicates. The storing process involves two classes: SearchData
and DataPipeline
.
These two classes work together to manage the data, ensure no duplicates are stored, and handle writing the data to CSV files.
Let’s dive into how these classes work and how they facilitate the storage of scraped data.
SearchData
Class
The SearchData
class represents a single scraped search result from Quora. Each instance of this class stores the name (title), URL, and rank of a Quora search result. Using this class ensures that the scraped data is structured and can be processed systematically.
Here is the structure of the SearchData
class:
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
# If the field is a string and is empty, give it a default value
if not value:
setattr(self, field.name, f"No {field.name}")
else:
# Strip leading/trailing whitespace
setattr(self, field.name, value.strip())
- Data Validation (
check_string_fields
): After initializing the object, the__post_init__
method checks the string fields (name
andurl
) to ensure they are not empty. If a field is empty, it assigns a default value (No {field.name}
), making sure that empty data doesn’t enter the pipeline. - Rank: The rank of each search result is tracked. This is useful for sorting or prioritizing data during analysis.
When scraping search results from Google, each search result is parsed and stored as an instance of SearchData
:
search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)
DataPipeline
Class
The DataPipeline
class is responsible for managing the collected data and writing it to a CSV file. It performs the following tasks:
- Managing a storage queue: Holds the scraped data temporarily before writing it to the CSV file.
- Checking for duplicates: Prevents duplicate entries based on the
name
field. - Saving to a CSV file: Writes the data to a CSV file once the storage queue reaches the defined limit or when the process ends.
Here’s the detailed breakdown of the DataPipeline
class:
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = [] # Track names to avoid duplicates
self.storage_queue = [] # Temporary storage for scraped data
self.storage_queue_limit = storage_queue_limit # Limit before writing to CSV
self.csv_filename = csv_filename # Name of the CSV file
self.csv_file_open = False # Check if the file is open
def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear() # Clear the queue after copying
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
# Ensure the CSV filename is valid
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
# Write the header if the file does not exist
if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()
# Append the data to the CSV
with open(valid_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
except Exception as e:
logger.error(f"Error saving CSV: {e}")
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
-
Queue-based Storage (
storage_queue
): Scraped data is first stored in a temporary queue. Once the queue reaches the definedstorage_queue_limit
(e.g., 50 entries), the data is saved to a CSV file. This avoids frequent I/O operations and optimizes performance. -
Duplicate Handling (
is_duplicate
): Before adding new data to the queue, theis_duplicate
method checks whether the data already exists by comparing thename
field. If a duplicate is found, it logs a warning and skips the entry. -
CSV File Writing (
save_to_csv
): When the queue is full or when the scraping process is complete, thesave_to_csv
method is called to write the collected data to a CSV file. It also ensures that the filename is valid and does not contain any illegal characters. -
Closing the Pipeline (
close_pipeline
): When scraping is finished, theclose_pipeline
method ensures that any remaining data in the queue is written to the CSV file.
Now the code would be after adding these classess and creating a DataPipeline
in the main,
import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Selenium configuration
# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary
# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)
# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
# Filter out invalid characters from the filename
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = (
os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
)
if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()
with open(
valid_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
except Exception as e:
logger.error(f"Error saving csv {e}")
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
logger.info("adding data")
logger.info(scraped_data)
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if (
len(self.storage_queue) >= self.storage_queue_limit
and not self.csv_file_open
):
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):
# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0
while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")
# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))
# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue
search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")
def start_scrape(keyword, pages, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, page, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
Step 4: Adding Concurrency
Concurrency is essential when scraping large amounts of data. If everything follows a single chain of events, one failure can disrupt the entire process.
Concurrency helps distribute tasks across multiple chains that run simultaneously, saving time and improving efficiency.
Use ThreadPoolExecutor
to run concurrent scraping on multiple pages:
with ThreadPoolExecutor(max_workers=5) as executor:
executor.submit(scrape_search_results, keyword, page_number)
The start_scrape
function would become:
def start_scrape(
keyword, pages, data_pipeline=None, max_threads=5, retries=3
):
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for page in range(pages):
# No need to pass the driver anymore, each thread will create its own
futures.append(
executor.submit(
scrape_search_results,
keyword,
page,
data_pipeline,
retries,
)
)
# Ensure all threads complete
for future in futures:
future.result() # This blocks until the thread finishes
The full code would be:
import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Selenium configuration
# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary
# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)
# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
# Filter out invalid characters from the filename
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = (
os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
)
if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()
with open(
valid_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
except Exception as e:
logger.error(f"Error saving csv {e}")
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
logger.info("adding data")
logger.info(scraped_data)
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if (
len(self.storage_queue) >= self.storage_queue_limit
and not self.csv_file_open
):
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):
# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0
while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")
# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))
# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue
search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")
def start_scrape(
keyword, pages, data_pipeline=None, max_threads=5, retries=3
):
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for page in range(pages):
# No need to pass the driver anymore, each thread will create its own
futures.append(
executor.submit(
scrape_search_results,
keyword,
page,
data_pipeline,
retries,
)
)
# Ensure all threads complete
for future in futures:
future.result() # This blocks until the thread finishes
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
Step 5: Production Run
Once the crawler is complete, set PAGES
to the desired number, and initiate a production run. Tweak the following constants as needed:
MAX_THREADS = 5
PAGES = 5
The main function would be:
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
If everything goes well, the final result should be like this:
Screenshot 7
The crawling of results from google took 16.469 seconds for 5 pages. 16.469 / 5 = 3.2938 seconds per page
Build A Quora Scraper
Now to scrape the answers from quora, following steps would be taken:
- Read the csv file generated from google search results.
- Open each post from the search results in the csv files and parse the answers data from the posts pages.
- Store the data.
- Perform concurrency for step 2 and 3 to proccess multiple posts at the same time.
- Run the Scraper
Step 1: Create Simple Business Data Parser
The goal of this step is to scrape each Quora post and extract the main content, specifically the answers and relevant replies, while filtering out non-relevant data such as promoted or related responses.
The process_post
function is responsible for visiting a Quora post, waiting for the content to load, and then extracting the answers. It uses Selenium to interact with the dynamically loaded elements on the Quora page.
Here's how it works:
def process_post(row, retries=3):
with webdriver.Chrome(service=service, options=options) as driver:
logger.info(f"Processing row: {row}")
url = row.get("url")
if not url:
logger.error(f"No URL found in row: {row}")
return
success = False
tries = 0
while tries < retries and not success:
try:
# Step 1: Open the URL and wait for the main content to load
driver.get(url)
logger.info(f"Accessing {url}")
wait = WebDriverWait(driver, 10)
main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']")))
# Step 2: Extract answer cards
answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper")
if not answer_cards:
logger.warning(f"No answer cards found at {url}")
# Step 3: Initialize a DataPipeline to store replies
if 'name' not in row:
logger.error(f"'name' key missing in row: {row}")
break
last_seen_name = ""
# Step 4: Loop through each answer card and extract name and reply
for answer_card in answer_cards:
try:
name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative")
name = name_element.text.replace("\n", "").strip()
reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content")
reply = reply_element.text.strip()
# Filter out promoted content and related questions
if "Sponsored" in name:
continue
if "Related questions" in name:
break
if name == last_seen_name:
continue
last_seen_name = name
print("name:", name)
print("reply:", reply)
except Exception as e:
continue
success = True
except Exception as e:
logger.error(f"Exception thrown while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
- Extracting Answers: Uses Selenium to wait for and locate the main content of a Quora post. It then extracts each individual answer (skipping promoted or irrelevant content) and stores it.
- Retries: Includes a retry mechanism to handle temporary failures, such as page load errors.
Step 2: Loading URLs To Scrape
Once you have scraped URLs from Google search results, you need to load these URLs into the scraper to process each Quora post.
This is handled by the process_results
function, which loads URLs from a CSV file and calls process_post
to scrape each post.
def process_results(csv_file, max_threads=5, retries=3):
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
logger.info(f"file opened")
for row in reader:
process_post(row, retries)
- Reading URLs: This function opens the CSV file generated by the search crawler, reads the URLs of the Quora posts, and processes them sequentially.
- Processing Each Post: For each URL, it calls the
process_post
function to scrape the content.
The full code would be:
import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Selenium configuration
# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary
# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)
# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
# Filter out invalid characters from the filename
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = (
os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
)
if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()
with open(
valid_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
except Exception as e:
logger.error(f"Error saving csv {e}")
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
logger.info("adding data")
logger.info(scraped_data)
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if (
len(self.storage_queue) >= self.storage_queue_limit
and not self.csv_file_open
):
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):
# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0
while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")
# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))
# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue
search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")
def start_scrape(
keyword, pages, data_pipeline=None, max_threads=5, retries=3
):
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for page in range(pages):
# No need to pass the driver anymore, each thread will create its own
futures.append(
executor.submit(
scrape_search_results,
keyword,
page,
data_pipeline,
retries,
)
)
# Ensure all threads complete
for future in futures:
future.result() # This blocks until the thread finishes
def process_post(row, retries=3):
with webdriver.Chrome(service=service, options=options) as driver:
logger.info(f"Processing row: {row}")
url = row.get("url")
if not url:
logger.error(f"No URL found in row: {row}")
return
success = False
tries = 0
while tries < retries and not success:
try:
# Step 1: Open the URL and wait for the main content to load
driver.get(url)
logger.info(f"Accessing {url}")
wait = WebDriverWait(driver, 10)
main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']")))
# Step 2: Extract answer cards
answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper")
if not answer_cards:
logger.warning(f"No answer cards found at {url}")
# Step 3: Initialize a DataPipeline to store replies
if 'name' not in row:
logger.error(f"'name' key missing in row: {row}")
break
last_seen_name = ""
# Step 4: Loop through each answer card and extract name and reply
for answer_card in answer_cards:
try:
name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative")
name = name_element.text.replace("\n", "").strip()
reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content")
reply = reply_element.text.strip()
# Filter out promoted content and related questions
if "Sponsored" in name:
continue
if "Related questions" in name:
break
if name == last_seen_name:
continue
last_seen_name = name
print("name:", name)
print("reply:", reply)
except Exception as e:
continue
success = True
except Exception as e:
logger.error(f"Exception thrown while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
def process_results(csv_file, max_threads=5, retries=3):
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
logger.info(f"file opened")
for row in reader:
process_post(row, retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 3: Storing the Scraped Data
The extracted data (such as answers and user names) is stored in a CSV file using the ReplyData
class and the DataPipeline
class.
Each scraped answer is stored as an instance of ReplyData
, ensuring that the scraped content is well-structured.
ReplyData
Class:
@dataclass
class ReplyData:
name: str = ""
reply: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
- Data Structuring: The
ReplyData
class is used to store the name of the user and the content of their reply in a structured format. - Field Validation: The
check_string_fields
method ensures that empty or malformed strings are handled by assigning a default value or removing unnecessary whitespace.
Each instance of ReplyData
is passed to the DataPipeline
class for storage in a CSV file.
The full code would be:
import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Selenium configuration
# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary
# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)
# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
@dataclass
class ReplyData:
name: str = ""
reply: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
# Filter out invalid characters from the filename
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = (
os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
)
if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()
with open(
valid_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
except Exception as e:
logger.error(f"Error saving csv {e}")
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
logger.info("adding data")
logger.info(scraped_data)
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if (
len(self.storage_queue) >= self.storage_queue_limit
and not self.csv_file_open
):
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):
# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0
while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")
# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))
# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue
search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")
def start_scrape(
keyword, pages, data_pipeline=None, max_threads=5, retries=3
):
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for page in range(pages):
# No need to pass the driver anymore, each thread will create its own
futures.append(
executor.submit(
scrape_search_results,
keyword,
page,
data_pipeline,
retries,
)
)
# Ensure all threads complete
for future in futures:
future.result() # This blocks until the thread finishes
def process_post(row, retries=3):
with webdriver.Chrome(service=service, options=options) as driver:
logger.info(f"Processing row: {row}")
url = row.get("url")
if not url:
logger.error(f"No URL found in row: {row}")
return
logger.info(f"Processing URL: {url}")
success = False
tries = 0
while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")
# Use explicit wait to ensure main content is loaded
wait = WebDriverWait(driver, 10)
main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']")))
# Extract answer cards
answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper")
if not answer_cards:
logger.warning(f"No answer cards found at {url}")
# Initialize a new DataPipeline for replies
if 'name' not in row:
logger.error(f"'name' key missing in row: {row}")
break
answer_pipeline = DataPipeline(
csv_filename=f"{row['name'].replace(' ', '-')}.csv"
)
last_seen_name = ""
for answer_card in answer_cards:
try:
name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative")
name = name_element.text.replace("\n", "").strip()
reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content")
reply = reply_element.text.strip()
if "Sponsored" in name:
continue
if "Related questions" in name:
break
if name == last_seen_name:
continue
last_seen_name = name
reply_data = ReplyData(name=name, reply=reply)
answer_pipeline.add_data(reply_data)
except Exception as e:
continue
answer_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
def process_results(csv_file, max_threads=5, retries=3):
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
logger.info(f"file opened")
for row in reader:
process_post(row, retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 4: Adding Concurrency
To scrape multiple Quora posts concurrently and improve efficiency, you can modify the process_results
function to use ThreadPoolExecutor
. This allows the scraper to handle multiple posts at once, significantly speeding up the process.
from concurrent.futures import ThreadPoolExecutor
def process_results(csv_file, max_threads=5, retries=3):
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
logger.info(f"file opened")
with ThreadPoolExecutor(max_workers=max_threads) as executor:
for row in reader:
executor.submit(process_post, row, retries)
- Threading:
ThreadPoolExecutor
is used to run multiple threads, allowing the scraper to process several Quora posts simultaneously. - Concurrency: The
max_workers
parameter defines the number of threads running concurrently. Each thread calls theprocess_post
function to handle a single Quora post.
The full code would be:
import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Selenium configuration
# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary
# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)
# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)
@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
@dataclass
class ReplyData:
name: str = ""
reply: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
# Filter out invalid characters from the filename
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = (
os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
)
if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()
with open(
valid_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
except Exception as e:
logger.error(f"Error saving csv {e}")
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
logger.info("adding data")
logger.info(scraped_data)
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if (
len(self.storage_queue) >= self.storage_queue_limit
and not self.csv_file_open
):
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):
# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0
while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")
# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))
# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue
search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")
def start_scrape(
keyword, pages, data_pipeline=None, max_threads=5, retries=3
):
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for page in range(pages):
# No need to pass the driver anymore, each thread will create its own
futures.append(
executor.submit(
scrape_search_results,
keyword,
page,
data_pipeline,
retries,
)
)
# Ensure all threads complete
for future in futures:
future.result() # This blocks until the thread finishes
def process_post(row, retries=3):
with webdriver.Chrome(service=service, options=options) as driver:
logger.info(f"Processing row: {row}")
url = row.get("url")
if not url:
logger.error(f"No URL found in row: {row}")
return
logger.info(f"Processing URL: {url}")
success = False
tries = 0
while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")
# Use explicit wait to ensure main content is loaded
wait = WebDriverWait(driver, 10)
main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']")))
# Extract answer cards
answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper")
if not answer_cards:
logger.warning(f"No answer cards found at {url}")
# Initialize a new DataPipeline for replies
if 'name' not in row:
logger.error(f"'name' key missing in row: {row}")
break
answer_pipeline = DataPipeline(
csv_filename=f"{row['name'].replace(' ', '-')}.csv"
)
last_seen_name = ""
for answer_card in answer_cards:
try:
name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative")
name = name_element.text.replace("\n", "").strip()
reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content")
reply = reply_element.text.strip()
if "Sponsored" in name:
continue
if "Related questions" in name:
break
if name == last_seen_name:
continue
last_seen_name = name
reply_data = ReplyData(name=name, reply=reply)
answer_pipeline.add_data(reply_data)
except Exception as e:
continue
answer_pipeline.close_pipeline()
success = True
except Exception as e:
logger.error(f"Exception thrown while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
def process_results(csv_file, max_threads=5, retries=3):
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
logger.info(f"file opened")
with ThreadPoolExecutor(max_workers=max_threads) as executor:
for row in reader:
executor.submit(process_post, row, retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
Step 5: Production Run
Finally, when your scraper is ready to run on a larger dataset, you can execute the full scraping process. This includes scraping search results for multiple keywords, storing the data in CSV files, and processing the scraped URLs concurrently.
The main
function would be:
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes: Scraping Search Results
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
# Processing Scraped Quora Posts
for file in aggregate_files:
process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
- Scraping Search Results: For each keyword in the
keyword_list
, the scraper collects search results from Google and stores them in a CSV file. - Concurrent Processing: After collecting the URLs, the
process_results
function processes each Quora post concurrently, using multiple threads for efficiency. - Parameters: You can adjust
MAX_RETRIES
,MAX_THREADS
, andPAGES
to fine-tune the scraper's performance. More threads will increase speed, but be mindful of server load and anti-bot measures.
After running the code, if everything runs fine, you will get the following results:
Screeshot 8
The crawling of all the quora posts took 651.258 seconds and out of those 16.469 were taken by google. So, the scraping of posts took: 651.258 - 16.469 = 634.789 seconds. We scraped 50 posts so, 634.789 / 50 = 12.695 seconds per page
Legal and Ethical Considerations
When scraping the web, you need to pay attention to your target site's Terms of Service and their robots.txt
. Legal or not, when you violate a site's terms, you can get suspended or even permanently banned.
Public data is typically free to scrape, but be cautious when dealing with private or gated content.
When scraping Quora, be mindful of their Terms of Service and review their robots.txt
file.
Ensure that your scraping activities do not violate legal or ethical guidelines.
When scraping private data, you are subject to a site's terms and privacy laws in the site's jurisdiction. If you don't know if your scraper is legal, you should consult an attorney.
Conclusion
This guide walks you through building a robust Quora scraper using Python and Selenium. With scraping logic, pagination, and concurrency You're now equipped to scrape Quora effectively. Be sure to follow ethical guidelines and monitor the performance of your scraper.
If you'd like to learn more about the tech stack used in this article, check out these links below.
More Python Web Scraping Guides
Here at ScrapeOps, we've got a ton of learning resources. Whether you're brand new or a seasoned web developer, we've got something for you.
Check out our extensive Selenium Web Scraping Playbook and build something!
If you'd like to learn more from our "How To Scrape" series, take a look at the links below.
Check out more tutorials and guides: