Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file with your "api_key"
. After you're done, add a file with the code below. Feel free to change any of the constants inside the main
to tweak your results.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReplyData: name: str = "" reply: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div span") for div_card in div_cards: name = div_card.find("h3") link = div_card.find("a") if not name or not link: continue result_number += 1 ranking = result_number search_data = SearchData( name=name.text, url=link.get("href"), rank=ranking ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_post(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='mainContent']") answer_cards = main_content.select("div div div div div[class='q-box']") answer_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") last_seen_name = "" for answer_card in answer_cards: excluded_words = ["All related", "Recommended"] array = answer_card.text.split("·") if len(array) < 3: continue promoted = "Promoted" in array[0] related = "Related" in array[2][0:30] or "Related" in array[-2][0:30] repeat_name = array[0] in last_seen_name or array[0] == last_seen_name if promoted or related or repeat_name: last_seen_name = array[0] continue reply_data = ReplyData( name=array[0], reply=array[-2] ) answer_pipeline.add_data(reply_data) answer_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_post, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
MAX_THREADS
: Controls the number of threads that the program will use for concurrent execution.MAX_RETRIES
: Defines the number of times the scraper will retry a failed request before giving up.PAGES
: Determines how many pages of Google search results to scrape for each keyword.LOCATION
: Specifies the geographical location (country) for the Google search.keyword_list
: This is a list of keywords for which the script will perform the search and subsequent scraping.https://www.google.com/search?q=learn%20rust%20site%3Aquora.com
https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com
https://www.quora.com/How-do-I-learn-the-Rust-programming-language
https://www.quora.com/Name-of-your-post
div
card has a class of q-box
.https://www.google.com/search?q={query}&start={page * 10}
country
and you'll be routed through a server in the country of your choosing."country": "us"
."country": "uk"
.mkdir quora-scraper cd quora-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div span") for div_card in div_cards: name = div_card.find("h3") link = div_card.find("a") if not name or not link: continue result_number += 1 ranking = result_number search_data = { "name": name.text, "url": link.get("href"), "rank": ranking } logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
soup.select("div span")
div_card
:
name = div_card.find("h3")
finds the post name on Quora.link = div_card.find("a")
finds the link to the post.start
parameter to our URL.With the start param added in, our URL now looks like this:https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}
start_scrape()
function. This one is pretty simple at the moment. It iterates through our pages
and runs scrape_search_results()
on each page.def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, page_number, retries=3): formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div span") for div_card in div_cards: name = div_card.find("h3") link = div_card.find("a") if not name or not link: continue result_number += 1 ranking = result_number search_data = { "name": name.text, "url": link.get("href"), "rank": ranking } logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, PAGES, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
SearchData
class and then we'll add a DataPipeline
class.SearchData
is a dataclass
that exists specifically to hold data we've scraped. Once we've transformed a result on the page into SearchData
, we need to pass it into our DataPipeline
.DataPipeline
pipes this data to a CSV file and filters out our duplicates.SearchData
. It holds a name
, url
, and rank
.@dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div span") for div_card in div_cards: name = div_card.find("h3") link = div_card.find("a") if not name or not link: continue result_number += 1 ranking = result_number search_data = SearchData( name=name.text, url=link.get("href"), rank=ranking ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, location, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
DataPipeline
. This DataPipeline
then gets passed into start_scrape()
and then scrape_search_results()
. We then use the add_data()
method to add this data to our pipeline.start_scrape()
already gives us the ability to run scrape_search_results()
on multiple pages, but we want to run it on multiple pages simultaneously. In order to accomplish this, we'll be using ThreadPoolExecutor
.Here is our new start_scrape()
function.def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages )
scrape_search_results
is the first argument passed into executor.map()
. This is the function we want to run on each available thread.executor.map()
as arrays that then get passed in on the individual threads.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div span") for div_card in div_cards: name = div_card.find("h3") link = div_card.find("a") if not name or not link: continue result_number += 1 ranking = result_number search_data = SearchData( name=name.text, url=link.get("href"), rank=ranking ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
get_scrapeops_url()
which takes in a url
and a location
. It also uses a wait
parameter which tells ScrapeOps how long to wait before sending back our results.Our function takes in all of this information and then converts it to a proxied url with all of our custom parameters.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
"api_key"
holds our ScrapeOps API key."url"
represents the url we'd like to scrape."country"
is the country we want to be routed through."wait"
is the period we want ScrapeOps to wait before sending our results.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div span") for div_card in div_cards: name = div_card.find("h3") link = div_card.find("a") if not name or not link: continue result_number += 1 ranking = result_number search_data = SearchData( name=name.text, url=link.get("href"), rank=ranking ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
PAGES
to 5 and time the operation. Here is our updated main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
main
:MAX_RETRIES
PAGES
MAX_THREADS
LOCATION
keyword_list
def process_post(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='mainContent']") answer_cards = main_content.select("div div div div div[class='q-box']") last_seen_name = "" for answer_card in answer_cards: excluded_words = ["All related", "Recommended"] array = answer_card.text.split("·") if len(array) < 3: continue promoted = "Promoted" in array[0] related = "Related" in array[2][0:30] or "Related" in array[-2][0:30] repeat_name = array[0] in last_seen_name or array[0] == last_seen_name if promoted or related or repeat_name: last_seen_name = array[0] continue print("name:", array[0]) print("reply:", array[-2]) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
main_content
box with soup.select_one("div[id='mainContent']")
.div
elements holding our replies with main_content.select("div div div div div[class='q-box']")
."Promoted"
or "Related"
reply."name"
and "reply"
out of the string array.start_scrape()
function, we'll create a new one, process_results()
. This function will read the CSV file and call process_post()
on all the rows from the file.Here is process_results()
.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_post(row, location, retries=retries)
for
loop and then we run process_post()
on each row from the file.Here is our full code up to this point.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div span") for div_card in div_cards: name = div_card.find("h3") link = div_card.find("a") if not name or not link: continue result_number += 1 ranking = result_number search_data = SearchData( name=name.text, url=link.get("href"), rank=ranking ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_post(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='mainContent']") answer_cards = main_content.select("div div div div div[class='q-box']") last_seen_name = "" for answer_card in answer_cards: excluded_words = ["All related", "Recommended"] array = answer_card.text.split("·") if len(array) < 3: continue promoted = "Promoted" in array[0] related = "Related" in array[2][0:30] or "Related" in array[-2][0:30] repeat_name = array[0] in last_seen_name or array[0] == last_seen_name if promoted or related or repeat_name: last_seen_name = array[0] continue print("name:", array[0]) print("reply:", array[-2]) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_post(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
DataPipeline
, we just need another dataclass
.In this section, we'll create a ReplyData
class. It's going to hold a name and the content of the reply.Here is our new dataclass
, ReplyData
.@dataclassclass ReplyData: name: str = "" reply: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
instance during our parsing function.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReplyData: name: str = "" reply: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div span") for div_card in div_cards: name = div_card.find("h3") link = div_card.find("a") if not name or not link: continue result_number += 1 ranking = result_number search_data = SearchData( name=name.text, url=link.get("href"), rank=ranking ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_post(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url, location=location) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='mainContent']") answer_cards = main_content.select("div div div div div[class='q-box']") answer_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") last_seen_name = "" for answer_card in answer_cards: excluded_words = ["All related", "Recommended"] array = answer_card.text.split("·") if len(array) < 3: continue promoted = "Promoted" in array[0] related = "Related" in array[2][0:30] or "Related" in array[-2][0:30] repeat_name = array[0] in last_seen_name or array[0] == last_seen_name if promoted or related or repeat_name: last_seen_name = array[0] continue reply_data = ReplyData( name=array[0], reply=array[-2] ) answer_pipeline.add_data(reply_data) answer_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_post(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
ReplyData
and another DataPipeline
, we can now pipe reply data straight to a CSV file.ThreadPoolExecutor
just like we did earlier. We pass in process_post
as our first argument, and then we pass subsequent arguments in as arrays, just like we did before.Here is our refactored process_results()
function.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_post, reader, [location] * len(reader), [retries] * len(reader) )
process_post
is the function we want to run on each available thread.process_post
get passed into executor.map()
as arrays.response = requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 2000 } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReplyData: name: str = "" reply: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, page_number, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code == 200: success = True else: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div span") for div_card in div_cards: name = div_card.find("h3") link = div_card.find("a") if not name or not link: continue result_number += 1 ranking = result_number search_data = SearchData( name=name.text, url=link.get("href"), rank=ranking ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keyword, pages, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, [keyword] * pages, [location] * pages, range(pages), [data_pipeline] * pages, [retries] * pages ) def process_post(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") main_content = soup.select_one("div[id='mainContent']") answer_cards = main_content.select("div div div div div[class='q-box']") answer_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") last_seen_name = "" for answer_card in answer_cards: excluded_words = ["All related", "Recommended"] array = answer_card.text.split("·") if len(array) < 3: continue promoted = "Promoted" in array[0] related = "Related" in array[2][0:30] or "Related" in array[-2][0:30] repeat_name = array[0] in last_seen_name or array[0] == last_seen_name if promoted or related or repeat_name: last_seen_name = array[0] continue reply_data = ReplyData( name=array[0], reply=array[-2] ) answer_pipeline.add_data(reply_data) answer_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_post, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 1 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
robots.txt
. Legal or not, when you violate a site's terms, you can get suspended or even permanently banned.Quora's terms are available to view here. Here is their robots.txt
.Public data is generally free to scrape. Public data is any data that is not gated behind a login page or some other type of authentication.When scraping private data, you are subject to a site's terms and privacy laws in the site's jurisdiction. If you don't know if your scraper is legal, you should consult an attorney.Then check out ScrapeOps, the complete toolkit for web scraping.
MAX_THREADS
: Controls the number of concurrent threads.MAX_RETRIES
: Defines the retry attempts for failed requests.PAGES
: Number of Google search result pages to scrape.LOCATION
: Set the geographical region for search results.keyword_list
: List of search terms to scrape.virtualenv
and install the necessary libraries.
import osimport csvimport jsonimport loggingimport timeimport stringfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.chrome.service import Servicefrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver.common.keys import Keysfrom concurrent.futures import ThreadPoolExecutorfrom dataclasses import dataclass, fields, asdictfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as EC # Set up logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # Selenium configuration # Set the path to your ChromeDriverCHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary # Configure the service to use the specified driverservice = Service(CHROMEDRIVER_PATH) # Setup Chrome options for headless browsingoptions = Options()options.add_argument("--headless")options.add_argument("--disable-gpu") # Required for headless mode in some environmentsoptions.add_argument("--no-sandbox") # Especially useful for Linux environmentsoptions.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systemsoptions.headless = True # Runs Chrome in headless mode (without GUI) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): value = getattr(self, field.name) if isinstance(value, str): if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) @dataclassclass ReplyData: name: str = "" reply: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): value = getattr(self, field.name) if isinstance(value, str): if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): try: self.csv_file_open = True data_to_save = self.storage_queue.copy() self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] logger.info("saving file data") logger.info(keys) # Filter out invalid characters from the filename valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits) valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars) logger.info(valid_filename) file_exists = ( os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0 ) if not file_exists: with open(valid_filename, 'w', newline='') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) writer.writeheader() with open( valid_filename, mode="a", newline="", encoding="utf-8" ) as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False except Exception as e: logger.error(f"Error saving csv {e}") def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): logger.info("adding data") logger.info(scraped_data) if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if ( len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open ): self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3): # Use a context manager to ensure the driver is properly closed with webdriver.Chrome(service=service, options=options) as driver: formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 logger.info(f"page {page_number}") url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" success = False tries = 0 while tries < retries and not success: try: driver.get(url) logger.info(f"Accessing {url}") # Use explicit wait to ensure elements are loaded wait = WebDriverWait(driver, 10) wait.until(EC.presence_of_element_located((By.ID, "rso"))) # Extract search result cards for i in range(1, 11): try: # Attempt primary XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href") except: try: # Fallback XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href") except Exception as e: continue search_data = SearchData( name=name, url=link, rank=result_number + i # Increment rank per result ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2) logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}") def start_scrape( keyword, pages, data_pipeline=None, max_threads=5, retries=3): with ThreadPoolExecutor(max_workers=max_threads) as executor: futures = [] for page in range(pages): # No need to pass the driver anymore, each thread will create its own futures.append( executor.submit( scrape_search_results, keyword, page, data_pipeline, retries, ) ) # Ensure all threads complete for future in futures: future.result() # This blocks until the thread finishes def process_post(row, retries=3): with webdriver.Chrome(service=service, options=options) as driver: logger.info(f"Processing row: {row}") url = row.get("url") if not url: logger.error(f"No URL found in row: {row}") return logger.info(f"Processing URL: {url}") success = False tries = 0 while tries < retries and not success: try: driver.get(url) logger.info(f"Accessing {url}") # Use explicit wait to ensure main content is loaded wait = WebDriverWait(driver, 10) main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']"))) # Extract answer cards answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper") if not answer_cards: logger.warning(f"No answer cards found at {url}") # Initialize a new DataPipeline for replies if 'name' not in row: logger.error(f"'name' key missing in row: {row}") break answer_pipeline = DataPipeline( csv_filename=f"{row['name'].replace(' ', '-')}.csv" ) last_seen_name = "" for answer_card in answer_cards: try: name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative") name = name_element.text.replace("\n", "").strip() reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content") reply = reply_element.text.strip() if "Sponsored" in name: continue if "Related questions" in name: break if name == last_seen_name: continue last_seen_name = name reply_data = ReplyData(name=name, reply=reply) answer_pipeline.add_data(reply_data) except Exception as e: continue answer_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2) def process_results(csv_file, max_threads=5, retries=3): with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) logger.info(f"file opened") with ThreadPoolExecutor(max_workers=max_threads) as executor: for row in reader: executor.submit(process_post, row, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com
{formatted_keyword}
is the term you're searching for on Quora.For example, searching for "learn Rust" on Quora via Google would look like this:https://www.google.com/search?q=learn+rust+site%3Aquora.com
quora.com
.//*[@id='rso']/div[1]/div/div[1]/a/h3
Screenshot 4
div
tags.Use the following steps to locate elements:q-click-wrapper
for answers.Screenshot 5
start
parameter.For example:https://www.google.com/search?q=learn+rust+site%3Aquora.com&start=0
https://www.google.com/search?q=learn+rust+site%3Aquora.com&start=10
start
value by 10, you can paginate through all the results.mkdir quora-scrapercd quora-scraper
python -m venv venvsource venv/bin/activate # On Windows use: venv\Scripts\activate
Selenium
for browser automation, WebDriverWait
for adding wait until an element appears on the browser and any other necessary libraries. Install them using pip
:pip install seleniumpip install WebDriverWait
chromedriver.exe
in your project folder or somewhere accessible in your system’s PATH
.CHROMEDRIVER_PATH
and set up the Service
:CHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if chromedriver file is not in the current directory
Selenium
to select the HTML elements (h3
for titles and a
for links).import osimport csvimport jsonimport loggingimport timeimport stringfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.chrome.service import Servicefrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver.common.keys import Keysfrom concurrent.futures import ThreadPoolExecutorfrom dataclasses import dataclass, fields, asdictfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as EC # Set up logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # Selenium configuration # Set the path to your ChromeDriverCHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary # Configure the service to use the specified driverservice = Service(CHROMEDRIVER_PATH) # Setup Chrome options for headless browsingoptions = Options()options.add_argument("--headless")options.add_argument("--disable-gpu") # Required for headless mode in some environmentsoptions.add_argument("--no-sandbox") # Especially useful for Linux environmentsoptions.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systemsoptions.headless = True # Runs Chrome in headless mode (without GUI) def scrape_search_results(keyword, data_pipeline=None, retries=3): # Use a context manager to ensure the driver is properly closed with webdriver.Chrome(service=service, options=options) as driver: formatted_keyword = keyword.replace(" ", "+") result_number = 0 logger.info(f"page {page_number}") url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" success = False tries = 0 while tries < retries and not success: try: driver.get(url) logger.info(f"Accessing {url}") # Use explicit wait to ensure elements are loaded wait = WebDriverWait(driver, 10) wait.until(EC.presence_of_element_located((By.ID, "rso"))) # Extract search result cards for i in range(1, 11): try: # Attempt primary XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href") except: try: # Fallback XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href") except Exception as e: continue search_data = SearchData( name=name, url=link, rank=result_number + i # Increment rank per result ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2) logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}") if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") scrape_search_results(keyword, retries=MAX_RETRIES)
start
parameter:result_number = page_number * 10url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}"
import osimport csvimport jsonimport loggingimport timeimport stringfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.chrome.service import Servicefrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver.common.keys import Keysfrom concurrent.futures import ThreadPoolExecutorfrom dataclasses import dataclass, fields, asdictfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as EC # Set up logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # Selenium configuration # Set the path to your ChromeDriverCHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary # Configure the service to use the specified driverservice = Service(CHROMEDRIVER_PATH) # Setup Chrome options for headless browsingoptions = Options()options.add_argument("--headless")options.add_argument("--disable-gpu") # Required for headless mode in some environmentsoptions.add_argument("--no-sandbox") # Especially useful for Linux environmentsoptions.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systemsoptions.headless = True # Runs Chrome in headless mode (without GUI) def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3): # Use a context manager to ensure the driver is properly closed with webdriver.Chrome(service=service, options=options) as driver: formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 logger.info(f"page {page_number}") url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" success = False tries = 0 while tries < retries and not success: try: driver.get(url) logger.info(f"Accessing {url}") # Use explicit wait to ensure elements are loaded wait = WebDriverWait(driver, 10) wait.until(EC.presence_of_element_located((By.ID, "rso"))) # Extract search result cards for i in range(1, 11): try: # Attempt primary XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href") except: try: # Fallback XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href") except Exception as e: continue search_data = SearchData( name=name, url=link, rank=result_number + i # Increment rank per result ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2) logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}") def start_scrape(keyword, pages, location, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") start_scrape(keyword, pages=PAGES, retries=MAX_RETRIES)
SearchData
and DataPipeline
.These two classes work together to manage the data, ensure no duplicates are stored, and handle writing the data to CSV files.Let’s dive into how these classes work and how they facilitate the storage of scraped data.SearchData
ClassThe SearchData
class represents a single scraped search result from Quora. Each instance of this class stores the name (title), URL, and rank of a Quora search result. Using this class ensures that the scraped data is structured and can be processed systematically.Here is the structure of the SearchData
class:@dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): value = getattr(self, field.name) if isinstance(value, str): # If the field is a string and is empty, give it a default value if not value: setattr(self, field.name, f"No {field.name}") else: # Strip leading/trailing whitespace setattr(self, field.name, value.strip())
check_string_fields
): After initializing the object, the __post_init__
method checks the string fields (name
and url
) to ensure they are not empty. If a field is empty, it assigns a default value (No {field.name}
), making sure that empty data doesn’t enter the pipeline.SearchData
:search_data = SearchData( name=name, url=link, rank=result_number + i # Increment rank per result)
DataPipeline
ClassThe DataPipeline
class is responsible for managing the collected data and writing it to a CSV file. It performs the following tasks:name
field.DataPipeline
class:class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] # Track names to avoid duplicates self.storage_queue = [] # Temporary storage for scraped data self.storage_queue_limit = storage_queue_limit # Limit before writing to CSV self.csv_filename = csv_filename # Name of the CSV file self.csv_file_open = False # Check if the file is open def save_to_csv(self): try: self.csv_file_open = True data_to_save = self.storage_queue.copy() self.storage_queue.clear() # Clear the queue after copying if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] # Ensure the CSV filename is valid valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits) valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars) logger.info(valid_filename) file_exists = os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0 # Write the header if the file does not exist if not file_exists: with open(valid_filename, 'w', newline='') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) writer.writeheader() # Append the data to the CSV with open(valid_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False except Exception as e: logger.error(f"Error saving CSV: {e}") def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv()
storage_queue
): Scraped data is first stored in a temporary queue. Once the queue reaches the defined storage_queue_limit
(e.g., 50 entries), the data is saved to a CSV file. This avoids frequent I/O operations and optimizes performance.
is_duplicate
): Before adding new data to the queue, the is_duplicate
method checks whether the data already exists by comparing the name
field. If a duplicate is found, it logs a warning and skips the entry.
save_to_csv
): When the queue is full or when the scraping process is complete, the save_to_csv
method is called to write the collected data to a CSV file. It also ensures that the filename is valid and does not contain any illegal characters.
close_pipeline
): When scraping is finished, the close_pipeline
method ensures that any remaining data in the queue is written to the CSV file.
DataPipeline
in the main,import osimport csvimport jsonimport loggingimport timeimport stringfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.chrome.service import Servicefrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver.common.keys import Keysfrom concurrent.futures import ThreadPoolExecutorfrom dataclasses import dataclass, fields, asdictfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as EC # Set up logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # Selenium configuration # Set the path to your ChromeDriverCHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary # Configure the service to use the specified driverservice = Service(CHROMEDRIVER_PATH) # Setup Chrome options for headless browsingoptions = Options()options.add_argument("--headless")options.add_argument("--disable-gpu") # Required for headless mode in some environmentsoptions.add_argument("--no-sandbox") # Especially useful for Linux environmentsoptions.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systemsoptions.headless = True # Runs Chrome in headless mode (without GUI) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): value = getattr(self, field.name) if isinstance(value, str): if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): try: self.csv_file_open = True data_to_save = self.storage_queue.copy() self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] # Filter out invalid characters from the filename valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits) valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars) logger.info(valid_filename) file_exists = ( os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0 ) if not file_exists: with open(valid_filename, 'w', newline='') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) writer.writeheader() with open( valid_filename, mode="a", newline="", encoding="utf-8" ) as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False except Exception as e: logger.error(f"Error saving csv {e}") def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): logger.info("adding data") logger.info(scraped_data) if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if ( len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open ): self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3): # Use a context manager to ensure the driver is properly closed with webdriver.Chrome(service=service, options=options) as driver: formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 logger.info(f"page {page_number}") url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" success = False tries = 0 while tries < retries and not success: try: driver.get(url) logger.info(f"Accessing {url}") # Use explicit wait to ensure elements are loaded wait = WebDriverWait(driver, 10) wait.until(EC.presence_of_element_located((By.ID, "rso"))) # Extract search result cards for i in range(1, 11): try: # Attempt primary XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href") except: try: # Fallback XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href") except Exception as e: continue search_data = SearchData( name=name, url=link, rank=result_number + i # Increment rank per result ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2) logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}") def start_scrape(keyword, pages, data_pipeline=None, retries=3): for page in range(pages): scrape_search_results(keyword, page, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv")
ThreadPoolExecutor
to run concurrent scraping on multiple pages:with ThreadPoolExecutor(max_workers=5) as executor: executor.submit(scrape_search_results, keyword, page_number)
start_scrape
function would become:def start_scrape( keyword, pages, data_pipeline=None, max_threads=5, retries=3): with ThreadPoolExecutor(max_workers=max_threads) as executor: futures = [] for page in range(pages): # No need to pass the driver anymore, each thread will create its own futures.append( executor.submit( scrape_search_results, keyword, page, data_pipeline, retries, ) ) # Ensure all threads complete for future in futures: future.result() # This blocks until the thread finishes
import osimport csvimport jsonimport loggingimport timeimport stringfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.chrome.service import Servicefrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver.common.keys import Keysfrom concurrent.futures import ThreadPoolExecutorfrom dataclasses import dataclass, fields, asdictfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as EC # Set up logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # Selenium configuration # Set the path to your ChromeDriverCHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary # Configure the service to use the specified driverservice = Service(CHROMEDRIVER_PATH) # Setup Chrome options for headless browsingoptions = Options()options.add_argument("--headless")options.add_argument("--disable-gpu") # Required for headless mode in some environmentsoptions.add_argument("--no-sandbox") # Especially useful for Linux environmentsoptions.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systemsoptions.headless = True # Runs Chrome in headless mode (without GUI) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): value = getattr(self, field.name) if isinstance(value, str): if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): try: self.csv_file_open = True data_to_save = self.storage_queue.copy() self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] # Filter out invalid characters from the filename valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits) valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars) logger.info(valid_filename) file_exists = ( os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0 ) if not file_exists: with open(valid_filename, 'w', newline='') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) writer.writeheader() with open( valid_filename, mode="a", newline="", encoding="utf-8" ) as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False except Exception as e: logger.error(f"Error saving csv {e}") def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): logger.info("adding data") logger.info(scraped_data) if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if ( len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open ): self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3): # Use a context manager to ensure the driver is properly closed with webdriver.Chrome(service=service, options=options) as driver: formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 logger.info(f"page {page_number}") url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" success = False tries = 0 while tries < retries and not success: try: driver.get(url) logger.info(f"Accessing {url}") # Use explicit wait to ensure elements are loaded wait = WebDriverWait(driver, 10) wait.until(EC.presence_of_element_located((By.ID, "rso"))) # Extract search result cards for i in range(1, 11): try: # Attempt primary XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href") except: try: # Fallback XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href") except Exception as e: continue search_data = SearchData( name=name, url=link, rank=result_number + i # Increment rank per result ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2) logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}") def start_scrape( keyword, pages, data_pipeline=None, max_threads=5, retries=3): with ThreadPoolExecutor(max_workers=max_threads) as executor: futures = [] for page in range(pages): # No need to pass the driver anymore, each thread will create its own futures.append( executor.submit( scrape_search_results, keyword, page, data_pipeline, retries, ) ) # Ensure all threads complete for future in futures: future.result() # This blocks until the thread finishes if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv")
PAGES
to the desired number, and initiate a production run. Tweak the following constants as needed:MAX_THREADS = 5PAGES = 5
if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] ## Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.")
Screenshot 7
The crawling of results from google took 16.469 seconds for 5 pages. 16.469 / 5 = 3.2938 seconds per pageprocess_post
function is responsible for visiting a Quora post, waiting for the content to load, and then extracting the answers. It uses Selenium to interact with the dynamically loaded elements on the Quora page.Here's how it works:def process_post(row, retries=3): with webdriver.Chrome(service=service, options=options) as driver: logger.info(f"Processing row: {row}") url = row.get("url") if not url: logger.error(f"No URL found in row: {row}") return success = False tries = 0 while tries < retries and not success: try: # Step 1: Open the URL and wait for the main content to load driver.get(url) logger.info(f"Accessing {url}") wait = WebDriverWait(driver, 10) main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']"))) # Step 2: Extract answer cards answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper") if not answer_cards: logger.warning(f"No answer cards found at {url}") # Step 3: Initialize a DataPipeline to store replies if 'name' not in row: logger.error(f"'name' key missing in row: {row}") break last_seen_name = "" # Step 4: Loop through each answer card and extract name and reply for answer_card in answer_cards: try: name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative") name = name_element.text.replace("\n", "").strip() reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content") reply = reply_element.text.strip() # Filter out promoted content and related questions if "Sponsored" in name: continue if "Related questions" in name: break if name == last_seen_name: continue last_seen_name = name print("name:", name) print("reply:", reply) except Exception as e: continue success = True except Exception as e: logger.error(f"Exception thrown while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2)
process_results
function, which loads URLs from a CSV file and calls process_post
to scrape each post.def process_results(csv_file, max_threads=5, retries=3): with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) logger.info(f"file opened") for row in reader: process_post(row, retries)
process_post
function to scrape the content.import osimport csvimport jsonimport loggingimport timeimport stringfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.chrome.service import Servicefrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver.common.keys import Keysfrom concurrent.futures import ThreadPoolExecutorfrom dataclasses import dataclass, fields, asdictfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as EC # Set up logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # Selenium configuration # Set the path to your ChromeDriverCHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary # Configure the service to use the specified driverservice = Service(CHROMEDRIVER_PATH) # Setup Chrome options for headless browsingoptions = Options()options.add_argument("--headless")options.add_argument("--disable-gpu") # Required for headless mode in some environmentsoptions.add_argument("--no-sandbox") # Especially useful for Linux environmentsoptions.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systemsoptions.headless = True # Runs Chrome in headless mode (without GUI) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): value = getattr(self, field.name) if isinstance(value, str): if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): try: self.csv_file_open = True data_to_save = self.storage_queue.copy() self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] # Filter out invalid characters from the filename valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits) valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars) logger.info(valid_filename) file_exists = ( os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0 ) if not file_exists: with open(valid_filename, 'w', newline='') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) writer.writeheader() with open( valid_filename, mode="a", newline="", encoding="utf-8" ) as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False except Exception as e: logger.error(f"Error saving csv {e}") def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): logger.info("adding data") logger.info(scraped_data) if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if ( len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open ): self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3): # Use a context manager to ensure the driver is properly closed with webdriver.Chrome(service=service, options=options) as driver: formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 logger.info(f"page {page_number}") url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" success = False tries = 0 while tries < retries and not success: try: driver.get(url) logger.info(f"Accessing {url}") # Use explicit wait to ensure elements are loaded wait = WebDriverWait(driver, 10) wait.until(EC.presence_of_element_located((By.ID, "rso"))) # Extract search result cards for i in range(1, 11): try: # Attempt primary XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href") except: try: # Fallback XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href") except Exception as e: continue search_data = SearchData( name=name, url=link, rank=result_number + i # Increment rank per result ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2) logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}") def start_scrape( keyword, pages, data_pipeline=None, max_threads=5, retries=3): with ThreadPoolExecutor(max_workers=max_threads) as executor: futures = [] for page in range(pages): # No need to pass the driver anymore, each thread will create its own futures.append( executor.submit( scrape_search_results, keyword, page, data_pipeline, retries, ) ) # Ensure all threads complete for future in futures: future.result() # This blocks until the thread finishes def process_post(row, retries=3): with webdriver.Chrome(service=service, options=options) as driver: logger.info(f"Processing row: {row}") url = row.get("url") if not url: logger.error(f"No URL found in row: {row}") return success = False tries = 0 while tries < retries and not success: try: # Step 1: Open the URL and wait for the main content to load driver.get(url) logger.info(f"Accessing {url}") wait = WebDriverWait(driver, 10) main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']"))) # Step 2: Extract answer cards answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper") if not answer_cards: logger.warning(f"No answer cards found at {url}") # Step 3: Initialize a DataPipeline to store replies if 'name' not in row: logger.error(f"'name' key missing in row: {row}") break last_seen_name = "" # Step 4: Loop through each answer card and extract name and reply for answer_card in answer_cards: try: name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative") name = name_element.text.replace("\n", "").strip() reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content") reply = reply_element.text.strip() # Filter out promoted content and related questions if "Sponsored" in name: continue if "Related questions" in name: break if name == last_seen_name: continue last_seen_name = name print("name:", name) print("reply:", reply) except Exception as e: continue success = True except Exception as e: logger.error(f"Exception thrown while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2) def process_results(csv_file, max_threads=5, retries=3): with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) logger.info(f"file opened") for row in reader: process_post(row, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
ReplyData
class and the DataPipeline
class.Each scraped answer is stored as an instance of ReplyData
, ensuring that the scraped content is well-structured.ReplyData
Class:@dataclassclass ReplyData: name: str = "" reply: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): value = getattr(self, field.name) if isinstance(value, str): if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip())
ReplyData
class is used to store the name of the user and the content of their reply in a structured format.check_string_fields
method ensures that empty or malformed strings are handled by assigning a default value or removing unnecessary whitespace.ReplyData
is passed to the DataPipeline
class for storage in a CSV file.The full code would be:import osimport csvimport jsonimport loggingimport timeimport stringfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.chrome.service import Servicefrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver.common.keys import Keysfrom concurrent.futures import ThreadPoolExecutorfrom dataclasses import dataclass, fields, asdictfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as EC # Set up logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # Selenium configuration # Set the path to your ChromeDriverCHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary # Configure the service to use the specified driverservice = Service(CHROMEDRIVER_PATH) # Setup Chrome options for headless browsingoptions = Options()options.add_argument("--headless")options.add_argument("--disable-gpu") # Required for headless mode in some environmentsoptions.add_argument("--no-sandbox") # Especially useful for Linux environmentsoptions.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systemsoptions.headless = True # Runs Chrome in headless mode (without GUI) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): value = getattr(self, field.name) if isinstance(value, str): if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) @dataclassclass ReplyData: name: str = "" reply: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): value = getattr(self, field.name) if isinstance(value, str): if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): try: self.csv_file_open = True data_to_save = self.storage_queue.copy() self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] # Filter out invalid characters from the filename valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits) valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars) logger.info(valid_filename) file_exists = ( os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0 ) if not file_exists: with open(valid_filename, 'w', newline='') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) writer.writeheader() with open( valid_filename, mode="a", newline="", encoding="utf-8" ) as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False except Exception as e: logger.error(f"Error saving csv {e}") def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): logger.info("adding data") logger.info(scraped_data) if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if ( len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open ): self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3): # Use a context manager to ensure the driver is properly closed with webdriver.Chrome(service=service, options=options) as driver: formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 logger.info(f"page {page_number}") url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" success = False tries = 0 while tries < retries and not success: try: driver.get(url) logger.info(f"Accessing {url}") # Use explicit wait to ensure elements are loaded wait = WebDriverWait(driver, 10) wait.until(EC.presence_of_element_located((By.ID, "rso"))) # Extract search result cards for i in range(1, 11): try: # Attempt primary XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href") except: try: # Fallback XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href") except Exception as e: continue search_data = SearchData( name=name, url=link, rank=result_number + i # Increment rank per result ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2) logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}") def start_scrape( keyword, pages, data_pipeline=None, max_threads=5, retries=3): with ThreadPoolExecutor(max_workers=max_threads) as executor: futures = [] for page in range(pages): # No need to pass the driver anymore, each thread will create its own futures.append( executor.submit( scrape_search_results, keyword, page, data_pipeline, retries, ) ) # Ensure all threads complete for future in futures: future.result() # This blocks until the thread finishes def process_post(row, retries=3): with webdriver.Chrome(service=service, options=options) as driver: logger.info(f"Processing row: {row}") url = row.get("url") if not url: logger.error(f"No URL found in row: {row}") return logger.info(f"Processing URL: {url}") success = False tries = 0 while tries < retries and not success: try: driver.get(url) logger.info(f"Accessing {url}") # Use explicit wait to ensure main content is loaded wait = WebDriverWait(driver, 10) main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']"))) # Extract answer cards answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper") if not answer_cards: logger.warning(f"No answer cards found at {url}") # Initialize a new DataPipeline for replies if 'name' not in row: logger.error(f"'name' key missing in row: {row}") break answer_pipeline = DataPipeline( csv_filename=f"{row['name'].replace(' ', '-')}.csv" ) last_seen_name = "" for answer_card in answer_cards: try: name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative") name = name_element.text.replace("\n", "").strip() reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content") reply = reply_element.text.strip() if "Sponsored" in name: continue if "Related questions" in name: break if name == last_seen_name: continue last_seen_name = name reply_data = ReplyData(name=name, reply=reply) answer_pipeline.add_data(reply_data) except Exception as e: continue answer_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2) def process_results(csv_file, max_threads=5, retries=3): with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) logger.info(f"file opened") for row in reader: process_post(row, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
process_results
function to use ThreadPoolExecutor
. This allows the scraper to handle multiple posts at once, significantly speeding up the process.from concurrent.futures import ThreadPoolExecutor def process_results(csv_file, max_threads=5, retries=3): with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) logger.info(f"file opened") with ThreadPoolExecutor(max_workers=max_threads) as executor: for row in reader: executor.submit(process_post, row, retries)
ThreadPoolExecutor
is used to run multiple threads, allowing the scraper to process several Quora posts simultaneously.max_workers
parameter defines the number of threads running concurrently. Each thread calls the process_post
function to handle a single Quora post.import osimport csvimport jsonimport loggingimport timeimport stringfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.chrome.service import Servicefrom selenium.webdriver.chrome.options import Optionsfrom selenium.webdriver.common.keys import Keysfrom concurrent.futures import ThreadPoolExecutorfrom dataclasses import dataclass, fields, asdictfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as EC # Set up logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) # Selenium configuration # Set the path to your ChromeDriverCHROMEDRIVER_PATH = 'chromedriver.exe' # Adjust this to the actual path if necessary # Configure the service to use the specified driverservice = Service(CHROMEDRIVER_PATH) # Setup Chrome options for headless browsingoptions = Options()options.add_argument("--headless")options.add_argument("--disable-gpu") # Required for headless mode in some environmentsoptions.add_argument("--no-sandbox") # Especially useful for Linux environmentsoptions.add_argument("--disable-dev-shm-usage") # Helps with resource issues on some systemsoptions.headless = True # Runs Chrome in headless mode (without GUI) @dataclassclass SearchData: name: str = "" url: str = "" rank: int = 0 def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): value = getattr(self, field.name) if isinstance(value, str): if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) @dataclassclass ReplyData: name: str = "" reply: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): value = getattr(self, field.name) if isinstance(value, str): if not value: setattr(self, field.name, f"No {field.name}") else: setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): try: self.csv_file_open = True data_to_save = self.storage_queue.copy() self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] # Filter out invalid characters from the filename valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits) valid_filename = ''.join(c for c in self.csv_filename if c in valid_chars) logger.info(valid_filename) file_exists = ( os.path.isfile(valid_filename) and os.path.getsize(valid_filename) > 0 ) if not file_exists: with open(valid_filename, 'w', newline='') as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) writer.writeheader() with open( valid_filename, mode="a", newline="", encoding="utf-8" ) as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False except Exception as e: logger.error(f"Error saving csv {e}") def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): logger.info("adding data") logger.info(scraped_data) if not self.is_duplicate(scraped_data): self.storage_queue.append(scraped_data) if ( len(self.storage_queue) >= self.storage_queue_limit and not self.csv_file_open ): self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if self.storage_queue: self.save_to_csv() def scrape_search_results(keyword, page_number, data_pipeline=None, retries=3): # Use a context manager to ensure the driver is properly closed with webdriver.Chrome(service=service, options=options) as driver: formatted_keyword = keyword.replace(" ", "+") result_number = page_number * 10 logger.info(f"page {page_number}") url = f"https://www.google.com/search?q={formatted_keyword}%20site%3Aquora.com&start={result_number}" success = False tries = 0 while tries < retries and not success: try: driver.get(url) logger.info(f"Accessing {url}") # Use explicit wait to ensure elements are loaded wait = WebDriverWait(driver, 10) wait.until(EC.presence_of_element_located((By.ID, "rso"))) # Extract search result cards for i in range(1, 11): try: # Attempt primary XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div[1]/div/div/span/a").get_attribute("href") except: try: # Fallback XPath name = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a/h3").text link = driver.find_element(By.XPATH, f"//*[@id='rso']/div[{i}]/div/div/div/div[1]/div/div/span/a").get_attribute("href") except Exception as e: continue search_data = SearchData( name=name, url=link, rank=result_number + i # Increment rank per result ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2) logger.info(f"Storage queue length after page {page_number}: {len(data_pipeline.storage_queue)}") def start_scrape( keyword, pages, data_pipeline=None, max_threads=5, retries=3): with ThreadPoolExecutor(max_workers=max_threads) as executor: futures = [] for page in range(pages): # No need to pass the driver anymore, each thread will create its own futures.append( executor.submit( scrape_search_results, keyword, page, data_pipeline, retries, ) ) # Ensure all threads complete for future in futures: future.result() # This blocks until the thread finishes def process_post(row, retries=3): with webdriver.Chrome(service=service, options=options) as driver: logger.info(f"Processing row: {row}") url = row.get("url") if not url: logger.error(f"No URL found in row: {row}") return logger.info(f"Processing URL: {url}") success = False tries = 0 while tries < retries and not success: try: driver.get(url) logger.info(f"Accessing {url}") # Use explicit wait to ensure main content is loaded wait = WebDriverWait(driver, 10) main_content = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[id='mainContent']"))) # Extract answer cards answer_cards = main_content.find_elements(By.CSS_SELECTOR, "div.q-click-wrapper") if not answer_cards: logger.warning(f"No answer cards found at {url}") # Initialize a new DataPipeline for replies if 'name' not in row: logger.error(f"'name' key missing in row: {row}") break answer_pipeline = DataPipeline( csv_filename=f"{row['name'].replace(' ', '-')}.csv" ) last_seen_name = "" for answer_card in answer_cards: try: name_element = answer_card.find_element(By.CSS_SELECTOR, "div.q-relative") name = name_element.text.replace("\n", "").strip() reply_element = answer_card.find_element(By.CSS_SELECTOR, "div.spacing_log_answer_content") reply = reply_element.text.strip() if "Sponsored" in name: continue if "Related questions" in name: break if name == last_seen_name: continue last_seen_name = name reply_data = ReplyData(name=name, reply=reply) answer_pipeline.add_data(reply_data) except Exception as e: continue answer_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown while processing {url}: {e}") tries += 1 if tries >= retries: logger.error(f"Max retries exceeded for {url}") else: logger.info(f"Retrying {url} ({tries}/{retries})") time.sleep(2) def process_results(csv_file, max_threads=5, retries=3): with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) logger.info(f"file opened") with ThreadPoolExecutor(max_workers=max_threads) as executor: for row in reader: executor.submit(process_post, row, retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] # Job Processes for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") for file in aggregate_files: process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
main
function would be:if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 PAGES = 5 logger.info(f"Crawl starting...") # INPUT ---> List of keywords to scrape keyword_list = ["learn rust"] aggregate_files = [] # Job Processes: Scraping Search Results for keyword in keyword_list: filename = keyword.replace(" ", "-") crawl_pipeline = DataPipeline(csv_filename=f"{filename}.csv") start_scrape(keyword, PAGES, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() aggregate_files.append(f"{filename}.csv") logger.info(f"Crawl complete.") # Processing Scraped Quora Posts for file in aggregate_files: process_results(file, max_threads=MAX_THREADS, retries=MAX_RETRIES)
keyword_list
, the scraper collects search results from Google and stores them in a CSV file.process_results
function processes each Quora post concurrently, using multiple threads for efficiency.MAX_RETRIES
, MAX_THREADS
, and PAGES
to fine-tune the scraper's performance. More threads will increase speed, but be mindful of server load and anti-bot measures.Screeshot 8
The crawling of all the quora posts took 651.258 seconds and out of those 16.469 were taken by google. So, the scraping of posts took: 651.258 - 16.469 = 634.789 seconds. We scraped 50 posts so, 634.789 / 50 = 12.695 seconds per pagerobots.txt
. Legal or not, when you violate a site's terms, you can get suspended or even permanently banned.Public data is typically free to scrape, but be cautious when dealing with private or gated content.When scraping Quora, be mindful of their Terms of Service and review their robots.txt
file.Ensure that your scraping activities do not violate legal or ethical guidelines.When scraping private data, you are subject to a site's terms and privacy laws in the site's jurisdiction. If you don't know if your scraper is legal, you should consult an attorney.