Skip to main content

Scrape Quora With Seleniump

How to Scrape Quora With Selenium

Quora is a popular question-and-answer platform, housing valuable information across various topics. Since its launch in 2009, Quora has served as a go-to destination for people looking to ask questions and get insightful answers from a global community of experts. This data can be scraped and analyzed to understand trends, explore customer pain points, and gain insights into market opportunities.

In this tutorial, we will learn how to build a Quora scraper using Selenium.

Need help scraping the web?

Then check out ScrapeOps, the complete toolkit for web scraping.


TLDR - How to Scrape Quora

Need to scrape Quora but don't have time to code? Use the scraper below!

To quickly scrape Quora without coding from scratch, follow the steps below:

  1. Write or paste the code provided to initiate the scraper.

  2. You can configure various parameters:

    • MAX_THREADS: Controls the number of concurrent threads.
    • MAX_RETRIES: Defines the retry attempts for failed requests.
    • PAGES: Number of Google search result pages to scrape.
    • LOCATION: Set the geographical region for search results.
    • keyword_list: List of search terms to scrape.
  3. Install the virtualenv and install the necessary libraries.

  4. After setting up, run the scraper, and data will be saved to CSV files.

import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Selenium configuration

# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary

# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)

# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)

@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())


@dataclass
class ReplyData:
name: str = ""
reply: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())


class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
logger.info("saving file data")
logger.info(keys)
# Filter out invalid characters from the filename
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = (
os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
)

if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()

with open(
valid_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)


for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

except Exception as e:
logger.error(f"Error saving csv {e}")

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
logger.info("adding data")
logger.info(scraped_data)
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if (
len(self.storage_queue) >= self.storage_queue_limit
and not self.csv_file_open
):
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):
# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")

url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0

while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")

# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))

# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue

search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)

data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)

logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")

def start_scrape(
keyword, pages, data_pipeline=None, max_threads=5, retries=3
):
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for page in range(pages):
# No need to pass the driver anymore, each thread will create its own
futures.append(
executor.submit(
scrape_search_results,
keyword,
page,
data_pipeline,
retries,
)
)

# Ensure all threads complete
for future in futures:
future.result() # This blocks until the thread finishes

def process_post(row, retries=3):
with webdriver.Chrome(service=service, options=options) as driver:
logger.info(f"Processing row: {row}")
url = row.get("url")
if not url:
logger.error(f"No URL found in row: {row}")
return
logger.info(f"Processing URL: {url}")
success = False
tries = 0

while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")

# Use explicit wait to ensure main content is loaded
wait = WebDriverWait(driver, 10)
main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']")))

# Extract answer cards
answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper")
if not answer_cards:
logger.warning(f"No answer cards found at {url}")

# Initialize a new DataPipeline for replies
if 'name' not in row:
logger.error(f"'name' key missing in row: {row}")
break

answer_pipeline = DataPipeline(
csv_filename=f"{row['name'].replace(' ', '-')}.csv"
)
last_seen_name = ""

for answer_card in answer_cards:
try:
name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative")
name = name_element.text.replace("\n", "").strip()
reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content")
reply = reply_element.text.strip()

if "Sponsored" in name:
continue
if "Related questions" in name:
break
if name == last_seen_name:
continue
last_seen_name = name

reply_data = ReplyData(name=name, reply=reply)
answer_pipeline.add_data(reply_data)
except Exception as e:
continue

answer_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)

def process_results(csv_file, max_threads=5, retries=3):
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
logger.info(f"file opened")

with ThreadPoolExecutor(max_workers=max_threads) as executor:
for row in reader:
executor.submit(process_post, row, retries)




if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)


How To Architect Our Quora Scraper

Our project consists of two main components: a crawler and a scraper.

  • Crawler: It searches Quora through Google, extracts posts, and saves relevant data.
  • Scraper: It reads the saved data, visits individual Quora posts, and scrapes detailed content.

Our crawler needs to perform the following actions:

  1. Perform a search on Quora through Google.
  2. Parse and extract search results, including pagination.
  3. Save data (post titles and links) efficiently.
  4. Execute concurrent searches on multiple result pages.

After the crawl, our scraper will execute these actions:

  1. Read the saved data from the CSV.
  2. Visit each individual Quora post and extract relevant information.
  3. Store extracted data in a structured format.
  4. Use concurrency to speed up scraping multiple posts.

Understanding How To Scrape Quora

Scraping Quora is unique compared to scraping other websites due to its frequent usage of dynamic content, anti-bot mechanisms, and requiring users to be logged in for most interactions.

Quora Login Modal

However, by leveraging Google search to find Quora posts and extracting the content using Selenium, we can bypass the need for an account and scrape publicly available Quora posts indirectly.

Here’s a breakdown of how we scrape Quora:


Step 1: How To Request Quora Pages

Directly scraping Quora pages can be difficult because accessing them usually requires logging in. To circumvent this, we scrape Quora indirectly by querying Google search results for Quora pages.

If we just search Quora, on google it gives us an option to search it through there without going to the site.

Google Search Results Quora

Here we query Google with the following structure to find relevant Quora pages:

https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com

Where {formatted_keyword} is the term you're searching for on Quora.

For example, searching for "learn Rust" on Quora via Google would look like this:

https://www.google.com/search?q=learn+rust+site%3Aquora.com

Google Search Results Page

This URL returns search results where the website is limited to quora.com.


Step 2: How To Extract Data From Quora Results and Pages

Once you have retrieved the Google search results, you will need to extract URLs from the results and scrape the actual Quora pages for detailed answers.

Finding the Correct XPaths or CSS Selectors

You can use Chrome DevTools (right-click on a webpage, then click “Inspect”) to find the correct XPaths or CSS selectors for the elements you want to extract.

1. Find XPath for Quora Search Results:

  • For each search result in Google, you’ll need to locate the post titles and URLs.
  • To locate the post title in a Google search result, right-click on the title element in Chrome DevTools and select “CopXPath”.
  • This might give you an XPath like:
    //*[@id='rso']/div[1]/div/div[1]/a/h3
  • Use similar techniques to extract the URL.

Screenshot 4

2. Find the Main Content and Replies on Quora:

Once you have the Quora post URLs, you’ll need to scrape the actual content and replies on each Quora page. In Quora posts, the answers are often deeply nested within div tags.

Use the following steps to locate elements:

  • Inspect the page, and right-click the area containing the answers.
  • Copy the CSS selector or XPath of the answer's container. For example, Quora uses a class like q-click-wrapper for answers.

Screenshot 5


Step 3: How To Control Pagination:

When scraping multiple Google search result pages, you’ll need to control pagination by updating the search URL’s start parameter.

For example:

  • Page 1: Returns the first 10 results.
    https://www.google.com/search?q=learn+rust+site%3Aquora.com&start=0
  • Page 2: Returns the next 10 results.
    https://www.google.com/search?q=learn+rust+site%3Aquora.com&start=10

By incrementing the start value by 10, you can paginate through all the results.


Setting Up Our Quora Scraper Project

To get started with scraping Quora using Selenium and ScrapeOps, follow the steps below to set up the project environment, install dependencies, and configure your WebDriver.

Create a New Project Folder

mkdir quora-scraper
cd quora-scraper

Set Up a Virtual Environment

It's a good practice to isolate your project dependencies using a virtual environment:

python -m venv venv
source venv/bin/activate # On Windows use: venv\Scripts\activate

Install Dependencies

You'll need Selenium for browser automation, WebDriverWait for adding wait until an element appears on the browser and any other necessary libraries. Install them using pip:

pip install selenium
pip install WebDriverWait

Download and Set Up ChromeDriver

Selenium requires a WebDriver to interact with the browser. For this project, we are using ChromeDriver.

  1. Download ChromeDriver:

  2. Move ChromeDriver to Project Path:

    • Once downloaded, place chromedriver.exe in your project folder or somewhere accessible in your system’s PATH.

Configure ChromeDriver Path in Code

You’ll need to specify the path to the ChromeDriver in your Python code. Here’s how you can configure the CHROMEDRIVER_PATH and set up the Service:

CHROMEDRIVER_PATH = 'chromedriver.exe'  # Adjust this to the actual path if chromedriver file is not in the current directory

With this setup complete, you’re ready to move on to the next step: building the Quora search crawler.


Build A Quora Search Crawler


Step 1: Create Simple Search Data Parser

Create a parser that extracts Quora post titles and links from Google search results. Use Selenium to select the HTML elements (h3 for titles and a for links).

import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC



# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Selenium configuration

# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary

# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)

# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)

def scrape_search_results(keyword, data_pipeline=None, retries=3):
# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = 0
logger.info(f"page {page_number}")

url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0

while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")

# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))

# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue

search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)

data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)

logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")


if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
scrape_search_results(keyword, retries=MAX_RETRIES)

Step 2: Add Pagination

Modify the search URL to paginate through results by adjusting the start parameter:

result_number = page_number * 10
url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"

After adding pagination the code would be:

import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC



# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Selenium configuration

# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary

# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)

# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)

def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):
# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")

url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0

while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")

# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))

# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue

search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)

data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)

logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")

def start_scrape(keyword, pages, location, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, page, data_pipeline=data_pipeline, retries=retries)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
start_scrape(keyword, pages=PAGES, retries=MAX_RETRIES)

Step 3: Storing the Scraped Data

Once the data is extracted from Quora or the Google search results, it is essential to store it efficiently and avoid any duplicates. The storing process involves two classes: SearchData and DataPipeline.

These two classes work together to manage the data, ensure no duplicates are stored, and handle writing the data to CSV files.

Let’s dive into how these classes work and how they facilitate the storage of scraped data.

SearchData Class

The SearchData class represents a single scraped search result from Quora. Each instance of this class stores the name (title), URL, and rank of a Quora search result. Using this class ensures that the scraped data is structured and can be processed systematically.

Here is the structure of the SearchData class:

@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
# If the field is a string and is empty, give it a default value
if not value:
setattr(self, field.name, f"No {field.name}")
else:
# Strip leading/trailing whitespace
setattr(self, field.name, value.strip())
  • Data Validation (check_string_fields): After initializing the object, the __post_init__ method checks the string fields (name and url) to ensure they are not empty. If a field is empty, it assigns a default value (No {field.name}), making sure that empty data doesn’t enter the pipeline.
  • Rank: The rank of each search result is tracked. This is useful for sorting or prioritizing data during analysis.

When scraping search results from Google, each search result is parsed and stored as an instance of SearchData:

search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)

DataPipeline Class

The DataPipeline class is responsible for managing the collected data and writing it to a CSV file. It performs the following tasks:

  • Managing a storage queue: Holds the scraped data temporarily before writing it to the CSV file.
  • Checking for duplicates: Prevents duplicate entries based on the name field.
  • Saving to a CSV file: Writes the data to a CSV file once the storage queue reaches the defined limit or when the process ends.

Here’s the detailed breakdown of the DataPipeline class:

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = [] # Track names to avoid duplicates
self.storage_queue = [] # Temporary storage for scraped data
self.storage_queue_limit = storage_queue_limit # Limit before writing to CSV
self.csv_filename = csv_filename # Name of the CSV file
self.csv_file_open = False # Check if the file is open

def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear() # Clear the queue after copying
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
# Ensure the CSV filename is valid
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0

# Write the header if the file does not exist
if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()

# Append the data to the CSV
with open(valid_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

except Exception as e:
logger.error(f"Error saving CSV: {e}")

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()
  • Queue-based Storage (storage_queue): Scraped data is first stored in a temporary queue. Once the queue reaches the defined storage_queue_limit (e.g., 50 entries), the data is saved to a CSV file. This avoids frequent I/O operations and optimizes performance.

  • Duplicate Handling (is_duplicate): Before adding new data to the queue, the is_duplicate method checks whether the data already exists by comparing the name field. If a duplicate is found, it logs a warning and skips the entry.

  • CSV File Writing (save_to_csv): When the queue is full or when the scraping process is complete, the save_to_csv method is called to write the collected data to a CSV file. It also ensures that the filename is valid and does not contain any illegal characters.

  • Closing the Pipeline (close_pipeline): When scraping is finished, the close_pipeline method ensures that any remaining data in the queue is written to the CSV file.

Now the code would be after adding these classess and creating a DataPipeline in the main,

import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC



# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Selenium configuration

# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary

# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)

# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)

@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
# Filter out invalid characters from the filename
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = (
os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
)

if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()

with open(
valid_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)


for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

except Exception as e:
logger.error(f"Error saving csv {e}")

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
logger.info("adding data")
logger.info(scraped_data)
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if (
len(self.storage_queue) >= self.storage_queue_limit
and not self.csv_file_open
):
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()


def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):

# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")

url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0

while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")

# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))

# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue

search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)

data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)

logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")


def start_scrape(keyword, pages, data_pipeline=None, retries=3):
for page in range(pages):
scrape_search_results(keyword, page, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")



Step 4: Adding Concurrency

Concurrency is essential when scraping large amounts of data. If everything follows a single chain of events, one failure can disrupt the entire process.

Concurrency helps distribute tasks across multiple chains that run simultaneously, saving time and improving efficiency.

Use ThreadPoolExecutor to run concurrent scraping on multiple pages:

with ThreadPoolExecutor(max_workers=5) as executor:
executor.submit(scrape_search_results, keyword, page_number)

The start_scrape function would become:


def start_scrape(
keyword, pages, data_pipeline=None, max_threads=5, retries=3
):
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for page in range(pages):
# No need to pass the driver anymore, each thread will create its own
futures.append(
executor.submit(
scrape_search_results,
keyword,
page,
data_pipeline,
retries,
)
)

# Ensure all threads complete
for future in futures:
future.result() # This blocks until the thread finishes

The full code would be:

import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC



# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Selenium configuration

# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary

# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)

# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)

@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
# Filter out invalid characters from the filename
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = (
os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
)

if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()

with open(
valid_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)


for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

except Exception as e:
logger.error(f"Error saving csv {e}")

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
logger.info("adding data")
logger.info(scraped_data)
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if (
len(self.storage_queue) >= self.storage_queue_limit
and not self.csv_file_open
):
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()


def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):

# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")

url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0

while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")

# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))

# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue

search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)

data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)

logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")


def start_scrape(
keyword, pages, data_pipeline=None, max_threads=5, retries=3
):
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for page in range(pages):
# No need to pass the driver anymore, each thread will create its own
futures.append(
executor.submit(
scrape_search_results,
keyword,
page,
data_pipeline,
retries,
)
)

# Ensure all threads complete
for future in futures:
future.result() # This blocks until the thread finishes

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")



Step 5: Production Run

Once the crawler is complete, set PAGES to the desired number, and initiate a production run. Tweak the following constants as needed:

MAX_THREADS = 5
PAGES = 5

The main function would be:

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []

## Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")

crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")

If everything goes well, the final result should be like this:

Screenshot 7

The crawling of results from google took 16.469 seconds for 5 pages. 16.469 / 5 = 3.2938 seconds per page


Build A Quora Scraper

Now to scrape the answers from quora, following steps would be taken:

  1. Read the csv file generated from google search results.
  2. Open each post from the search results in the csv files and parse the answers data from the posts pages.
  3. Store the data.
  4. Perform concurrency for step 2 and 3 to proccess multiple posts at the same time.
  5. Run the Scraper

Step 1: Create Simple Business Data Parser

The goal of this step is to scrape each Quora post and extract the main content, specifically the answers and relevant replies, while filtering out non-relevant data such as promoted or related responses.

The process_post function is responsible for visiting a Quora post, waiting for the content to load, and then extracting the answers. It uses Selenium to interact with the dynamically loaded elements on the Quora page.

Here's how it works:

def process_post(row, retries=3):
with webdriver.Chrome(service=service, options=options) as driver:
logger.info(f"Processing row: {row}")
url = row.get("url")
if not url:
logger.error(f"No URL found in row: {row}")
return

success = False
tries = 0
while tries < retries and not success:
try:
# Step 1: Open the URL and wait for the main content to load
driver.get(url)
logger.info(f"Accessing {url}")
wait = WebDriverWait(driver, 10)
main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']")))

# Step 2: Extract answer cards
answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper")
if not answer_cards:
logger.warning(f"No answer cards found at {url}")

# Step 3: Initialize a DataPipeline to store replies
if 'name' not in row:
logger.error(f"'name' key missing in row: {row}")
break

last_seen_name = ""

# Step 4: Loop through each answer card and extract name and reply
for answer_card in answer_cards:
try:
name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative")
name = name_element.text.replace("\n", "").strip()

reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content")
reply = reply_element.text.strip()

# Filter out promoted content and related questions
if "Sponsored" in name:
continue
if "Related questions" in name:
break
if name == last_seen_name:
continue
last_seen_name = name

print("name:", name)
print("reply:", reply)
except Exception as e:
continue

success = True

except Exception as e:
logger.error(f"Exception thrown while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)
  • Extracting Answers: Uses Selenium to wait for and locate the main content of a Quora post. It then extracts each individual answer (skipping promoted or irrelevant content) and stores it.
  • Retries: Includes a retry mechanism to handle temporary failures, such as page load errors.

Step 2: Loading URLs To Scrape

Once you have scraped URLs from Google search results, you need to load these URLs into the scraper to process each Quora post.

This is handled by the process_results function, which loads URLs from a CSV file and calls process_post to scrape each post.

def process_results(csv_file, max_threads=5, retries=3):
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
logger.info(f"file opened")

for row in reader:
process_post(row, retries)
  • Reading URLs: This function opens the CSV file generated by the search crawler, reads the URLs of the Quora posts, and processes them sequentially.
  • Processing Each Post: For each URL, it calls the process_post function to scrape the content.

The full code would be:

import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC



# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Selenium configuration

# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary

# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)

# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)

@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
# Filter out invalid characters from the filename
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = (
os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
)

if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()

with open(
valid_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)


for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

except Exception as e:
logger.error(f"Error saving csv {e}")

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
logger.info("adding data")
logger.info(scraped_data)
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if (
len(self.storage_queue) >= self.storage_queue_limit
and not self.csv_file_open
):
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):

# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")

url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0

while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")

# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))

# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue

search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)

data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)

logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")


def start_scrape(
keyword, pages, data_pipeline=None, max_threads=5, retries=3
):
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for page in range(pages):
# No need to pass the driver anymore, each thread will create its own
futures.append(
executor.submit(
scrape_search_results,
keyword,
page,
data_pipeline,
retries,
)
)

# Ensure all threads complete
for future in futures:
future.result() # This blocks until the thread finishes

def process_post(row, retries=3):
with webdriver.Chrome(service=service, options=options) as driver:
logger.info(f"Processing row: {row}")
url = row.get("url")
if not url:
logger.error(f"No URL found in row: {row}")
return

success = False
tries = 0
while tries < retries and not success:
try:
# Step 1: Open the URL and wait for the main content to load
driver.get(url)
logger.info(f"Accessing {url}")
wait = WebDriverWait(driver, 10)
main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']")))

# Step 2: Extract answer cards
answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper")
if not answer_cards:
logger.warning(f"No answer cards found at {url}")

# Step 3: Initialize a DataPipeline to store replies
if 'name' not in row:
logger.error(f"'name' key missing in row: {row}")
break

last_seen_name = ""

# Step 4: Loop through each answer card and extract name and reply
for answer_card in answer_cards:
try:
name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative")
name = name_element.text.replace("\n", "").strip()

reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content")
reply = reply_element.text.strip()

# Filter out promoted content and related questions
if "Sponsored" in name:
continue
if "Related questions" in name:
break
if name == last_seen_name:
continue
last_seen_name = name

print("name:", name)
print("reply:", reply)
except Exception as e:
continue

success = True

except Exception as e:
logger.error(f"Exception thrown while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)

def process_results(csv_file, max_threads=5, retries=3):
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
logger.info(f"file opened")

for row in reader:
process_post(row, retries)



if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)


Step 3: Storing the Scraped Data

The extracted data (such as answers and user names) is stored in a CSV file using the ReplyData class and the DataPipeline class.

Each scraped answer is stored as an instance of ReplyData, ensuring that the scraped content is well-structured.

ReplyData Class:

@dataclass
class ReplyData:
name: str = ""
reply: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())
  • Data Structuring: The ReplyData class is used to store the name of the user and the content of their reply in a structured format.
  • Field Validation: The check_string_fields method ensures that empty or malformed strings are handled by assigning a default value or removing unnecessary whitespace.

Each instance of ReplyData is passed to the DataPipeline class for storage in a CSV file.

The full code would be:

import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC



# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Selenium configuration

# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary

# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)

# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)

@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())

@dataclass
class ReplyData:
name: str = ""
reply: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
# Filter out invalid characters from the filename
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = (
os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
)

if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()

with open(
valid_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)


for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

except Exception as e:
logger.error(f"Error saving csv {e}")

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
logger.info("adding data")
logger.info(scraped_data)
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if (
len(self.storage_queue) >= self.storage_queue_limit
and not self.csv_file_open
):
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):

# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")

url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0

while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")

# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))

# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue

search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)

data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)

logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")


def start_scrape(
keyword, pages, data_pipeline=None, max_threads=5, retries=3
):
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for page in range(pages):
# No need to pass the driver anymore, each thread will create its own
futures.append(
executor.submit(
scrape_search_results,
keyword,
page,
data_pipeline,
retries,
)
)

# Ensure all threads complete
for future in futures:
future.result() # This blocks until the thread finishes

def process_post(row, retries=3):
with webdriver.Chrome(service=service, options=options) as driver:
logger.info(f"Processing row: {row}")
url = row.get("url")
if not url:
logger.error(f"No URL found in row: {row}")
return
logger.info(f"Processing URL: {url}")
success = False
tries = 0

while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")

# Use explicit wait to ensure main content is loaded
wait = WebDriverWait(driver, 10)
main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']")))

# Extract answer cards
answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper")
if not answer_cards:
logger.warning(f"No answer cards found at {url}")

# Initialize a new DataPipeline for replies
if 'name' not in row:
logger.error(f"'name' key missing in row: {row}")
break

answer_pipeline = DataPipeline(
csv_filename=f"{row['name'].replace(' ', '-')}.csv"
)
last_seen_name = ""

for answer_card in answer_cards:
try:
name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative")
name = name_element.text.replace("\n", "").strip()
reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content")
reply = reply_element.text.strip()

if "Sponsored" in name:
continue
if "Related questions" in name:
break
if name == last_seen_name:
continue
last_seen_name = name

reply_data = ReplyData(name=name, reply=reply)
answer_pipeline.add_data(reply_data)
except Exception as e:
continue

answer_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)

def process_results(csv_file, max_threads=5, retries=3):
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
logger.info(f"file opened")

for row in reader:
process_post(row, retries)



if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)


Step 4: Adding Concurrency

To scrape multiple Quora posts concurrently and improve efficiency, you can modify the process_results function to use ThreadPoolExecutor. This allows the scraper to handle multiple posts at once, significantly speeding up the process.

from concurrent.futures import ThreadPoolExecutor

def process_results(csv_file, max_threads=5, retries=3):
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
logger.info(f"file opened")

with ThreadPoolExecutor(max_workers=max_threads) as executor:
for row in reader:
executor.submit(process_post, row, retries)
  • Threading: ThreadPoolExecutor is used to run multiple threads, allowing the scraper to process several Quora posts simultaneously.
  • Concurrency: The max_workers parameter defines the number of threads running concurrently. Each thread calls the process_post function to handle a single Quora post.

The full code would be:

import os
import csv
import json
import logging
import time
import string
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, fields, asdict
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC



# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Selenium configuration

# Set the path to your ChromeDriver
CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary

# Configure the service to use the specified driver
service = Service(CHROMEDRIVER_PATH)

# Setup Chrome options for headless browsing
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu") # Required for headless mode in some environments
options.add_argument("--no-sandbox") # Especially useful for Linux environments
options.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systems
options.headless = True # Runs Chrome in headless mode (without GUI)

@dataclass
class SearchData:
name: str = ""
url: str = ""
rank: int = 0

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())

@dataclass
class ReplyData:
name: str = ""
reply: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
value = getattr(self, field.name)
if isinstance(value, str):
if not value:
setattr(self, field.name, f"No {field.name}")
else:
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
try:
self.csv_file_open = True
data_to_save = self.storage_queue.copy()
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
# Filter out invalid characters from the filename
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars)
logger.info(valid_filename)
file_exists = (
os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0
)

if not file_exists:
with open(valid_filename, 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
writer.writeheader()

with open(
valid_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)


for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

except Exception as e:
logger.error(f"Error saving csv {e}")

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
logger.info("adding data")
logger.info(scraped_data)
if not self.is_duplicate(scraped_data):
self.storage_queue.append(scraped_data)
if (
len(self.storage_queue) >= self.storage_queue_limit
and not self.csv_file_open
):
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if self.storage_queue:
self.save_to_csv()

def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3):

# Use a context manager to ensure the driver is properly closed
with webdriver.Chrome(service=service, options=options) as driver:
formatted_keyword = keyword.replace(" ", "+")
result_number = page_number * 10
logger.info(f"page {page_number}")

url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
success = False
tries = 0

while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")

# Use explicit wait to ensure elements are loaded
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.ID, "rso")))

# Extract search result cards
for i in range(1, 11):
try:
# Attempt primary XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href")
except:
try:
# Fallback XPath
name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text
link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href")
except Exception as e:
continue

search_data = SearchData(
name=name,
url=link,
rank=result_number + i # Increment rank per result
)

data_pipeline.add_data(search_data)

logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)

logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}")


def start_scrape(
keyword, pages, data_pipeline=None, max_threads=5, retries=3
):
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for page in range(pages):
# No need to pass the driver anymore, each thread will create its own
futures.append(
executor.submit(
scrape_search_results,
keyword,
page,
data_pipeline,
retries,
)
)

# Ensure all threads complete
for future in futures:
future.result() # This blocks until the thread finishes

def process_post(row, retries=3):
with webdriver.Chrome(service=service, options=options) as driver:
logger.info(f"Processing row: {row}")
url = row.get("url")
if not url:
logger.error(f"No URL found in row: {row}")
return
logger.info(f"Processing URL: {url}")
success = False
tries = 0

while tries < retries and not success:
try:
driver.get(url)
logger.info(f"Accessing {url}")

# Use explicit wait to ensure main content is loaded
wait = WebDriverWait(driver, 10)
main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']")))

# Extract answer cards
answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper")
if not answer_cards:
logger.warning(f"No answer cards found at {url}")

# Initialize a new DataPipeline for replies
if 'name' not in row:
logger.error(f"'name' key missing in row: {row}")
break

answer_pipeline = DataPipeline(
csv_filename=f"{row['name'].replace(' ', '-')}.csv"
)
last_seen_name = ""

for answer_card in answer_cards:
try:
name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative")
name = name_element.text.replace("\n", "").strip()
reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content")
reply = reply_element.text.strip()

if "Sponsored" in name:
continue
if "Related questions" in name:
break
if name == last_seen_name:
continue
last_seen_name = name

reply_data = ReplyData(name=name, reply=reply)
answer_pipeline.add_data(reply_data)
except Exception as e:
continue

answer_pipeline.close_pipeline()
success = True

except Exception as e:
logger.error(f"Exception thrown while processing {url}: {e}")
tries += 1
if tries >= retries:
logger.error(f"Max retries exceeded for {url}")
else:
logger.info(f"Retrying {url} ({tries}/{retries})")
time.sleep(2)

def process_results(csv_file, max_threads=5, retries=3):
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
logger.info(f"file opened")

with ThreadPoolExecutor(max_workers=max_threads) as executor:
for row in reader:
executor.submit(process_post, row, retries)

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")
# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []
# Job Processes
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")
logger.info(f"Crawl complete.")
for file in aggregate_files:
process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)


Step 5: Production Run

Finally, when your scraper is ready to run on a larger dataset, you can execute the full scraping process. This includes scraping search results for multiple keywords, storing the data in CSV files, and processing the scraped URLs concurrently.

The main function would be:

if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
PAGES = 5
logger.info(f"Crawl starting...")

# INPUT ---> List of keywords to scrape
keyword_list = ["learn rust"]
aggregate_files = []

# Job Processes: Scraping Search Results
for keyword in keyword_list:
filename = keyword.replace(" ", "-")
crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv")
start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
aggregate_files.append(f"{filename}.csv")

logger.info(f"Crawl complete.")

# Processing Scraped Quora Posts
for file in aggregate_files:
process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
  • Scraping Search Results: For each keyword in the keyword_list, the scraper collects search results from Google and stores them in a CSV file.
  • Concurrent Processing: After collecting the URLs, the process_results function processes each Quora post concurrently, using multiple threads for efficiency.
  • Parameters: You can adjust MAX_RETRIES, MAX_THREADS, and PAGES to fine-tune the scraper's performance. More threads will increase speed, but be mindful of server load and anti-bot measures.

After running the code, if everything runs fine, you will get the following results:

Screeshot 8

The crawling of all the quora posts took 651.258 seconds and out of those 16.469 were taken by google. So, the scraping of posts took: 651.258 - 16.469 = 634.789 seconds. We scraped 50 posts so, 634.789 / 50 = 12.695 seconds per page


When scraping the web, you need to pay attention to your target site's Terms of Service and their robots.txt. Legal or not, when you violate a site's terms, you can get suspended or even permanently banned.

Public data is typically free to scrape, but be cautious when dealing with private or gated content.

When scraping Quora, be mindful of their Terms of Service and review their robots.txt file.

Ensure that your scraping activities do not violate legal or ethical guidelines.

When scraping private data, you are subject to a site's terms and privacy laws in the site's jurisdiction. If you don't know if your scraper is legal, you should consult an attorney.


Conclusion

This guide walks you through building a robust Quora scraper using Python and Selenium. With scraping logic, pagination, and concurrency You're now equipped to scrape Quora effectively. Be sure to follow ethical guidelines and monitor the performance of your scraper.

If you'd like to learn more about the tech stack used in this article, check out these links below.


More Python Web Scraping Guides

Here at ScrapeOps, we've got a ton of learning resources. Whether you're brand new or a seasoned web developer, we've got something for you.

Check out our extensive Selenium Web Scraping Playbook and build something!

If you'd like to learn more from our "How To Scrape" series, take a look at the links below.

Check out more tutorials and guides: