Then check out ScrapeOps, the complete toolkit for web scraping.
config.json
file inside.{"api_key": "your-super-secret-api-key}
.python name_of_your_file.py
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" stars: int = 0 description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.select("div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find("a").get("href") link = f"https://play.google.com{href}" rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=link, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_app(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_container = soup.select_one("div[data-g-id='reviews']") review_headers = review_container.find_all("header") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in review_headers: stars = len(review.find_all("svg")) card = review.parent divs = card.select("div div div div div") name = divs[1].text date = divs[10].text description = divs[12].text review_data = ReviewData( name=name, date=date, stars=stars, description=description ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_app, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet", "web3 wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting scrape...") process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Scrape Complete")
MAX_RETRIES
: Sets the maximum number of retry attempts the script will make if a request fails.MAX_THREADS
: Sets the maximum number of threads (or concurrent tasks) that the script will use when scraping data.LOCATION
: Specifies the country code for the location from which you want to simulate the scraping requests.keyword_list
: A list of keywords or phrases that the script will use to search for listings on the store.https://play.google.com/store/search?q={keyword}&c=apps
role
of listitem
. When we search for this item, we'll be using the CSS selector div[role='listitem']
. From there, we can pull all of the data we need.div
container with a data-g-id
of reviews
. From within this container, we're going to do through and pull all of our reviews.country
param.When we choose a country
, ScrapeOps will route us through a server within that country."country": "us"
.residential
. This one is a boolean."residential": True
, ScrapeOps will assign us a residential IP address which exponentially decreases our likelihood of getting blocked.mkdir google-play-scraper cd google-play-scraper
python -m venv venv
source venv/bin/activate
pip install requests
pip install beautifulsoup4
scrape_search_results()
.Here is our starter script.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.select("div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find("a").get("href") link = f"https://play.google.com{href}" rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = { "name": name, "stars": stars, "url": link, "publisher": publisher } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, retries=3): for keyword in keywords: scrape_search_results(keyword, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" start_scrape(keyword_list, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
scrape_search_results()
, while
the operation hasn't succeeded, we do the following:div_cards = soup.select("div[role='listitem']")
finds all div
tags with the role
, listitem
.div
cards.info_rows = div_card.select("div div span")
finds all of the rows inside each review card.name
, publisher
and rating
from our info_rows
.href
element with href = div_card.find("a").get("href")
and use some basic string formatting to reconstruct the full link.SearchData
and DataPipeline
.Here is SearchData
, we'll use it to hold data for individual search items.@dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. The DataPipeline
will be used to open a pipeline to a CSV file. This pipeline takes in dataclass
objects and pipes them to the CSV file while removing duplicate ones.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
DataPipeline
and pass it into start_scrape()
which in turn passes it into scrape_search_results()
. From within scrape_search_results()
instead of printing our data to the terminal, we use it to create a SearchData
object. That object then gets passed into our DataPipeline
.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.select("div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find("a").get("href") link = f"https://play.google.com{href}" rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=link, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, retries=3): for keyword in keywords: scrape_search_results(keyword, location, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
crawl_pipeline = DataPipeline(csv_filename=filename)
creates a DataPipeline
.scrape_search_results()
, we turn our parsed data into a SearchData
object and pass it into the pipeline.crawl_pipeline.close_pipeline()
.for
loop to iterate through our keyword_list
. In this section, we're going to replace that for
loop with ThreadPoolExecutor
which gives us the power of multithreading.Here is our refactored start_scrape()
function.def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) )
executor.map()
:scrape_search_results
is the function we'd like to call on our available threads.keywords
is an array of keywords we want to search.scrape_search_results
on each call.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: response = requests.get(url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.select("div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find("a").get("href") link = f"https://play.google.com{href}" rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=link, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
for
loop from start_scrape()
with ThreadPoolExecutor
.executor.map()
is the function we want to call.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
"api_key"
: holds our ScrapeOps API key."url"
: is the url that we'd like to scrape."country"
: is the country we'd like to appear in."wait"
: is how long we want the ScrapeOps server to wait before sending our response back."residential"
: is a boolean that lets ScrapeOps know if we want a residential IP. If we set it to True
, we get a residnetial IP instead of a datacenter IP address. This greatly decreases our likelihood of getting blocked.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.select("div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find("a").get("href") link = f"https://play.google.com{href}" rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=link, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
main
. Aside from that, everything else will stay the same.Take a look below.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet", "bitcoin wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
MAX_THREADS
MAX_RETRIES
LOCATION
keyword_list
def process_app(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_container = soup.select_one("div[data-g-id='reviews']") review_headers = review_container.find_all("header") for review in review_headers: stars = len(review.find_all("svg")) card = review.parent divs = card.select("div div div div div") name = divs[1].text date = divs[10].text description = divs[12].text review_data = { "name": name, "date": date, "stars": stars, "description": description } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
review_container
with soup.select_one("div[data-g-id='reviews']")
.header
elements for each review, review_container.find_all("header")
.review_headers
.header
, we pull the following information:
stars = len(review.find_all("svg"))
divs
, card.select("div div div div div")
name
, date
and description
from the list of divs
.start_scrape()
. We'll call this one process_results()
.Here is process_results()
. Later on, we'll replace the for
loop with multithreading like we did before.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_app(row, location, retries=retries)
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.select("div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find("a").get("href") link = f"https://play.google.com{href}" rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=link, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_app(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_container = soup.select_one("div[data-g-id='reviews']") review_headers = review_container.find_all("header") for review in review_headers: stars = len(review.find_all("svg")) card = review.parent divs = card.select("div div div div div") name = divs[1].text date = divs[10].text description = divs[12].text review_data = { "name": name, "date": date, "stars": stars, "description": description } print(review_data) success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_app(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting scrape...") process_results(filename, LOCATION, retries=MAX_RETRIES) logger.info("Scrape Complete")
dataclass
. We'll call this one ReviewData
. It will hold the following traits:name
date
stars
description
@dataclassclass ReviewData: name: str = "" date: str = "" stars: int = 0 description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
. You can see this in our fully updated code below.import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" stars: int = 0 description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.select("div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find("a").get("href") link = f"https://play.google.com{href}" rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=link, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_app(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(url) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_container = soup.select_one("div[data-g-id='reviews']") review_headers = review_container.find_all("header") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in review_headers: stars = len(review.find_all("svg")) card = review.parent divs = card.select("div div div div div") name = divs[1].text date = divs[10].text description = divs[12].text review_data = ReviewData( name=name, date=date, stars=stars, description=description ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_app(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting scrape...") process_results(filename, LOCATION, retries=MAX_RETRIES) logger.info("Scrape Complete")
ReviewData
represents an individual review in our software.DataPipeline
saves our ReviewData
to a CSV file.for
loop with ThreadPoolExecutor
. Here is our refactored process_results()
function.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_app, reader, [location] * len(reader), [retries] * len(reader) )
process_app
is the function we want to call on multiple threads this time.reader
is the array of rows from our CSV file.location
and retries
also get passed in as arrays, just like before.get_scrapeops_url()
was already defined earlier. Now, we just need to use it again. We'll change one line of our parsing function to unleash the proxy.response = requests.get(get_scrapeops_url(url, location=location))
import osimport csvimport requestsimport jsonimport loggingfrom urllib.parse import urlencodefrom bs4 import BeautifulSoupimport concurrent.futuresfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, "residential": True } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" stars: int = 0 description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = requests.get(scrapeops_proxy_url) logger.info(f"Recieved [{response.status_code}] from: {url}") if response.status_code != 200: raise Exception(f"Failed request, Status Code {response.status_code}") ## Extract Data soup = BeautifulSoup(response.text, "html.parser") div_cards = soup.select("div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.select("div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find("a").get("href") link = f"https://play.google.com{href}" rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=link, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_app(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: response = requests.get(get_scrapeops_url(url, location=location)) try: if response.status_code == 200: logger.info(f"Status: {response.status_code}") soup = BeautifulSoup(response.text, "html.parser") review_container = soup.select_one("div[data-g-id='reviews']") review_headers = review_container.find_all("header") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in review_headers: stars = len(review.find_all("svg")) card = review.parent divs = card.select("div div div div div") name = divs[1].text date = divs[10].text description = divs[12].text review_data = ReviewData( name=name, date=date, stars=stars, description=description ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True else: logger.warning(f"Failed Response: {response.status_code}") raise Exception(f"Failed Request, status code: {response.status_code}") except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_app, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet", "web3 wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting scrape...") process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Scrape Complete")
"bitcoin wallet"
to "web3 wallet"
. Otherwise, everything else is the same. Go ahead and take a look at our main
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet", "web3 wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting scrape...") process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Scrape Complete")
robots.txt.
Violating these terms can result in suspension or even permanent removal of your account.Their terms of service are available here. You can view their robots.txt
here.Public data is generally alright to scrape. When data is public (not gated behind a login), it is public knowledge and public property.When accessing data behind a login, you are accessing private data and therefore subject to their terms.If you don't know if your scraper is legal, you need to consult an attorney.config.json
file.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" stars: int = 0 description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.find_elements(By.CSS_SELECTOR, "div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=href, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_app(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(get_scrapeops_url(url, location=location)) review_container = driver.find_element(By.CSS_SELECTOR, "div[data-g-id='reviews']") review_headers = review_container.find_elements(By.CSS_SELECTOR, "header[data-review-id]") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in review_headers: header_text = review.text.split("\n") stars = review.find_element(By.CSS_SELECTOR, "div[role='img']").get_attribute("aria-label").split(" ")[1] name = header_text[0] date = header_text[2] description = review.find_element(By.XPATH, "..").text.split("\n")[3] review_data = ReviewData( name=name, date=date, stars=stars, description=description ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_app, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet", "web3 wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting scrape...") process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Scrape Complete")
main
in order to change your scraping results:MAX_RETRIES
: Specifies the maximum number of retry attempts for failed operations.MAX_THREADS
: Determines the number of threads to use for concurrent execution.LOCATION
: Specifies the geographic location for proxy requests.keyword_list
: Contains a list of search terms to scrape from the Google Play Store.https://play.google.com/store/search?q=crypto%20wallet&c=apps
https://play.google.com/store/search
.q=crypto%20wallet&c=apps
.q=crypto%20wallet
tells Google Play that we want to search the term, crypto wallet
.c=apps
is used to tell Google Play that we're looking at apps, not music, books etc.div
with the role, listitem
.We can use the following CSS selector to find it: div[role='listitem']
. When we're actually crawling the page, we'll use Selenium to find all the elements matching this selector.header
element with a data-review-id
. However, this data-review-id
changes with each review on the page. To select these items, we'll use header[data-review-id]
. This finds all header
elements where a data-review-id
is present.You might noticed that the review body isn't highlighted in the screenshot. This is because it's actually a separate element from the one we just mentioned.To find the body of the review, we'll need to find the parent element of the header[data-review-id]
item.country
param to Proxy Aggregator. Countries are represented by two-letter country codes.You can view a list of our supported countries below.Country | Country Code |
---|---|
Brazil | br |
Canada | ca |
China | cn |
India | in |
Italy | it |
Japan | jp |
France | fr |
Germany | de |
Russia | ru |
Spain | es |
United States | us |
United Kingdom | uk |
cd
into the folder.mkdir google-play-seleniumcd google-play-selenium
python -m venv venv
source venv/bin/activate
pip install selenium
scrape_search_results()
, and the function that triggers the actual scrape, start_scrape()
.Everything is important, but you should pay special attention to the parsing function. This is where our actual extraction takes place.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) def scrape_search_results(keyword, location, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) response = driver.get(url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.find_elements(By.CSS_SELECTOR, "div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = { "name": name, "stars": rating, "url": href, "publisher": publisher } print(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, retries=3): for keyword in keywords: scrape_search_results(keyword, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" start_scrape(keyword_list, LOCATION, retries=MAX_RETRIES) logger.info(f"Crawl complete.")
driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']")
finds all of our search results.div_card.find_elements(By.CSS_SELECTOR, "div div span")
gets all of our info_cards
. Each of these cards holds an individual chunk of relevant data.name
and publisher
from the info_cards
.div_card.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
gives us the link to the app's page on Google Play.0.0
. If there is a rating present, we reassign it to the rating
variable.dict
. Then, we printed that dict
to the console. This is acceptable for prototyping, but we need stronger datatypes for production and we need to pipe our data to a CSV file.In this section, we're going to replace our dict
with a more strongly typed SearchData
object. We'll also create a DataPipeline
class for the sole purpose of storing these objects.Take a look at SearchData
. It has a __post_init__()
method for instantiation and a check_string_fields()
method to ensure that no field is left empty. Its actual fields are pretty simple: name
, stars
, url
, and publisher
.@dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
SearchData
. Our DataPipeline
has a constructor, __init__()
along with several other methods for handling the storage_queue
and actually saving the data.class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv()
save_to_csv()
: Saves our queue to a CSV file.is_duplicate()
: Tells us whether an item is a duplicated. We use this to filter duplicates out of the pipeline.add_data()
: This adds data to our storage_queue
.close_pipeline()
: Sleep for 3 seconds to allow any file operations to complete. Then, we save the queue to our output file.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) response = driver.get(url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.find_elements(By.CSS_SELECTOR, "div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=href, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, retries=3): for keyword in keywords: scrape_search_results(keyword, location, data_pipeline=data_pipeline, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
main
reflects these new design changes.DataPipeline
.start_scrape()
which passes it into scrape_search_results()
where we add all of our SearchData
to the pipeline.start_scrape()
to take advantage of ThreadPoolExecutor
. It now takes a max_threads
argument. This allows us to choose how many threads we'd like to use.Instead of iterating through our keywords
, we now pass them into ThreadPoolExecutor
.def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) )
executor.map()
holds the key to our concurrency. It takes the following arguments.scrape_search_results
: The function we want to call on each thread.keywords
: The array of keywords we want to search.location
, data_pipeline
, and retries
all get passed in as arrays the length of our keywords
list. Our executor
then passes them into each separate thread that scrape_search_results
is running on.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) response = driver.get(url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.find_elements(By.CSS_SELECTOR, "div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=href, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
MAX_THREADS
now gets passed into start_scrape()
.start_scrape()
now runs multiple instances of our parsing function concurrently.api_key
, url
, and location
and use url encoding to wrap them all into a ScrapeOps Proxied URL.We also add a wait
parameter so our dynamic content can be rendered on the page before Proxy Aggregator sends it back to us.def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url
driver.get(url)
, we now use driver.get(scrapeops_proxy_url)
. Our finalized crawler is available below.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.find_elements(By.CSS_SELECTOR, "div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=href, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
main
below, we update our keyword list to hold two searches now. This allows us to see how the crawler performs when handling multiple searches concurrently. Everything else in our main stays the same.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet", "web3 wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.")
dict
to the console like we did earlier.Our new parsing function is called process_app()
.def process_app(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url, location=location) review_container = driver.find_element(By.CSS_SELECTOR, "div[data-g-id='reviews']") review_headers = review_container.find_elements(By.CSS_SELECTOR, "header[data-review-id]") for review in review_headers: header_text = review.text.split("\n") stars = review.find_element(By.CSS_SELECTOR, "div[role='img']").get_attribute("aria-label").split(" ")[1] name = header_text[0] date = header_text[2] description = review.find_element(By.XPATH, "..").text.split("\n")[3] review_data = { "name": name, "date": date, "stars": stars, "description": description } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}")
driver.find_element(By.CSS_SELECTOR, "div[data-g-id='reviews']")
finds the container holding all of the reviews.review_container.find_elements(By.CSS_SELECTOR, "header[data-review-id]")
is used to find the header
element we found earlier when inspecting the page.review.find_element(By.CSS_SELECTOR, "div[role='img']").get_attribute("aria-label").split(" ")[1]
finds our stars element and extracts the value from its aria-label
. We then split this value and grab element 1
, which contains the actual rating.name
and date
from the header_text
.XPATH
to find the parent element of the header
: review.find_element(By.XPATH, "..").text.split("\n")[3]
. The fourth element in the array (index number 3) is the actual description given in the review.start_scrape()
) called process_results()
. This function will read all the rows of our CSV report and then pass each row into the parsing function we just wrote.def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_app(row, location, retries=retries)
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.find_elements(By.CSS_SELECTOR, "div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=href, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_app(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url, location=location) review_container = driver.find_element(By.CSS_SELECTOR, "div[data-g-id='reviews']") review_headers = review_container.find_elements(By.CSS_SELECTOR, "header[data-review-id]") for review in review_headers: header_text = review.text.split("\n") stars = review.find_element(By.CSS_SELECTOR, "div[role='img']").get_attribute("aria-label").split(" ")[1] name = header_text[0] date = header_text[2] description = review.find_element(By.XPATH, "..").text.split("\n")[3] review_data = { "name": name, "date": date, "stars": stars, "description": description } print(review_data) success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_app(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting scrape...") process_results(filename, LOCATION, retries=MAX_RETRIES) logger.info("Scrape Complete")
DataPipeline
. However, we're once again extracting our data into a dict
.To fix this, we need to create another strongly typed dataclass to ensure that our data is formatted properly when we store it. This next one is called ReviewData
.It contains the same methods as SearchData
, but the fields are a bit different. Our new fields are name
, date
, stars
, and description
.@dataclassclass ReviewData: name: str = "" date: str = "" stars: int = 0 description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip())
DataPipeline
from process_app()
. We then pass all of these ReviewData
objects into the pipeline as we find them.import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" stars: int = 0 description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.find_elements(By.CSS_SELECTOR, "div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=href, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_app(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(url, location=location) review_container = driver.find_element(By.CSS_SELECTOR, "div[data-g-id='reviews']") review_headers = review_container.find_elements(By.CSS_SELECTOR, "header[data-review-id]") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in review_headers: header_text = review.text.split("\n") stars = review.find_element(By.CSS_SELECTOR, "div[role='img']").get_attribute("aria-label").split(" ")[1] name = header_text[0] date = header_text[2] description = review.find_element(By.XPATH, "..").text.split("\n")[3] review_data = ReviewData( name=name, date=date, stars=stars, description=description ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) for row in reader: process_app(row, location, retries=retries) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting scrape...") process_results(filename, LOCATION, retries=MAX_RETRIES) logger.info("Scrape Complete")
process_results()
.Like before, we utilize ThreadPoolExecutor
and pass our parser into it. Instead of our keywords
list, it takes our reader
object, which contains all the rows from our CSV file.def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_app, reader, [location] * len(reader), [retries] * len(reader) )
process_app()
.driver.get(get_scrapeops_url(url, location=location))
import osimport csvimport jsonimport loggingfrom urllib.parse import urlencodeimport concurrent.futuresfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom dataclasses import dataclass, field, fields, asdict API_KEY = "" OPTIONS = webdriver.ChromeOptions()OPTIONS.add_argument("--headless")OPTIONS.add_argument("--disable-javascript") with open("config.json", "r") as config_file: config = json.load(config_file) API_KEY = config["api_key"] def get_scrapeops_url(url, location="us"): payload = { "api_key": API_KEY, "url": url, "country": location, "wait": 5000, } proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload) return proxy_url ## Logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__) @dataclassclass SearchData: name: str = "" stars: float = 0 url: str = "" publisher: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) @dataclassclass ReviewData: name: str = "" date: str = "" stars: int = 0 description: str = "" def __post_init__(self): self.check_string_fields() def check_string_fields(self): for field in fields(self): # Check string fields if isinstance(getattr(self, field.name), str): # If empty set default text if getattr(self, field.name) == "": setattr(self, field.name, f"No {field.name}") continue # Strip any trailing spaces, etc. value = getattr(self, field.name) setattr(self, field.name, value.strip()) class DataPipeline: def __init__(self, csv_filename="", storage_queue_limit=50): self.names_seen = [] self.storage_queue = [] self.storage_queue_limit = storage_queue_limit self.csv_filename = csv_filename self.csv_file_open = False def save_to_csv(self): self.csv_file_open = True data_to_save = [] data_to_save.extend(self.storage_queue) self.storage_queue.clear() if not data_to_save: return keys = [field.name for field in fields(data_to_save[0])] file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file: writer = csv.DictWriter(output_file, fieldnames=keys) if not file_exists: writer.writeheader() for item in data_to_save: writer.writerow(asdict(item)) self.csv_file_open = False def is_duplicate(self, input_data): if input_data.name in self.names_seen: logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.") return True self.names_seen.append(input_data.name) return False def add_data(self, scraped_data): if self.is_duplicate(scraped_data) == False: self.storage_queue.append(scraped_data) if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False: self.save_to_csv() def close_pipeline(self): if self.csv_file_open: time.sleep(3) if len(self.storage_queue) > 0: self.save_to_csv() def scrape_search_results(keyword, location, data_pipeline=None, retries=3): formatted_keyword = keyword.replace(" ", "+") url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps" tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) scrapeops_proxy_url = get_scrapeops_url(url, location=location) response = driver.get(scrapeops_proxy_url) div_cards = driver.find_elements(By.CSS_SELECTOR, "div[role='listitem']") Excluded_words = ["Apps & games", "Movies & TV", "Books"] for div_card in div_cards: if div_card.text in Excluded_words: continue info_rows = div_card.find_elements(By.CSS_SELECTOR, "div div span") name = info_rows[1].text publisher = info_rows[2].text href = div_card.find_element(By.CSS_SELECTOR, "a").get_attribute("href") rating = 0.0 if info_rows[3].text != None: rating = info_rows[3].text search_data = SearchData( name=name, stars=rating, url=href, publisher=publisher ) data_pipeline.add_data(search_data) logger.info(f"Successfully parsed data from: {url}") success = True except Exception as e: logger.error(f"An error occurred while processing page {url}: {e}") logger.info(f"Retrying request for page: {url}, retries left {retries-tries}") tries+=1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3): with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( scrape_search_results, keywords, [location] * len(keywords), [data_pipeline] * len(keywords), [retries] * len(keywords) ) def process_app(row, location, retries=3): url = row["url"] tries = 0 success = False while tries <= retries and not success: try: driver = webdriver.Chrome(options=OPTIONS) driver.get(get_scrapeops_url(url, location=location)) review_container = driver.find_element(By.CSS_SELECTOR, "div[data-g-id='reviews']") review_headers = review_container.find_elements(By.CSS_SELECTOR, "header[data-review-id]") review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv") for review in review_headers: header_text = review.text.split("\n") stars = review.find_element(By.CSS_SELECTOR, "div[role='img']").get_attribute("aria-label").split(" ")[1] name = header_text[0] date = header_text[2] description = review.find_element(By.XPATH, "..").text.split("\n")[3] review_data = ReviewData( name=name, date=date, stars=stars, description=description ) review_pipeline.add_data(review_data) review_pipeline.close_pipeline() success = True except Exception as e: logger.error(f"Exception thrown: {e}") logger.warning(f"Failed to process page: {row['url']}") logger.warning(f"Retries left: {retries-tries}") tries += 1 finally: driver.quit() if not success: raise Exception(f"Max Retries exceeded: {retries}") else: logger.info(f"Successfully parsed: {row['url']}") def process_results(csv_file, location, max_threads=5, retries=3): logger.info(f"processing {csv_file}") with open(csv_file, newline="") as file: reader = list(csv.DictReader(file)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: executor.map( process_app, reader, [location] * len(reader), [retries] * len(reader) ) if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet", "web3 wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting scrape...") process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Scrape Complete")
main
that we used earlier. Feel free to change MAX_RETRIES
, MAX_THREADS
, LOCATION
or keyword_list
.if __name__ == "__main__": MAX_RETRIES = 3 MAX_THREADS = 5 LOCATION = "us" logger.info(f"Crawl starting...") ## INPUT ---> List of keywords to scrape keyword_list = ["crypto wallet", "web3 wallet"] aggregate_files = [] ## Job Processes filename = "report.csv" crawl_pipeline = DataPipeline(csv_filename=filename) start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES) crawl_pipeline.close_pipeline() logger.info(f"Crawl complete.") logger.info("Starting scrape...") process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES) logger.info("Scrape Complete")