How to Scrape Google Play With Requests and BeautifulSoup
Google Play is the largest app store in the world. The vast majority of all apps used (computer or smartphone) are available for download via the Play Store. When we scrape Google Play, we can collect all sorts of data such as ratings, reviews, publishers and more.
In this tutorial, we're going to build a search crawler and an app scraper to get data from Google Play.
- TLDR How to Scrape Google Play
- How To Architect Our Scraper
- Understanding How To Scrape google-play
- Setting Up Our Google Play Scraper
- Build A Google Play Search Crawler
- Build A Google Play Scraper
- Legal and Ethical Considerations
- Conclusion
- More Cool Articles
TLDR - How to Scrape Google Play
Need to scrape Google Play? Don't have time to read? Use the prebuilt scraper we have right here.
- First, you'll need to create a new folder with a
config.json
file inside. - Inside the config file, add your ScrapeOps API key:
{"api_key": "your-super-secret-api-key}
. - Then, copy and paste the code below into a new python file. You can run the file with
python name_of_your_file.py
.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
stars: int = 0
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")
Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")
name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text
search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
def process_app(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_container = soup.select_one("div[data-g-id='reviews']")
review_headers = review_container.find_all("header")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review in review_headers:
stars = len(review.find_all("svg"))
card = review.parent
divs = card.select("div div div div div")
name = divs[1].text
date = divs[10].text
description = divs[12].text
review_data = ReviewData(
name=name,
date=date,
stars=stars,
description=description
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_app,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet", "web3 wallet"]
aggregate_files = []
## Job Processes
filename = "report.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
logger.info("Starting scrape...")
process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Scrape Complete")
This code will generate a search report, report.csv. Then it will generate an individual report for each app that we scraped and saved in report.csv. Feel free to adjust any of the following constants to fit your scraping project.
MAX_RETRIES
: Sets the maximum number of retry attempts the script will make if a request fails.MAX_THREADS
: Sets the maximum number of threads (or concurrent tasks) that the script will use when scraping data.LOCATION
: Specifies the country code for the location from which you want to simulate the scraping requests.keyword_list
: A list of keywords or phrases that the script will use to search for listings on the store.
How To Architect Our Google Play Scraper
Our scraper project will consist of two separate scrapers, a result crawler and an app scraper.
- Our crawler will perform a search, then parse and store the results.
- Our scraper will be looking up an app, parsing and storing reviews about that app.
Here is the process for our crawler:
- Parse information from a search page.
- Store the parsed results from the search.
- Concurrently execute steps one and two for multiple searches.
- Proxy integration will help us avoid anti-bots and roadblocks.
Here is the process for our scraper:
- Read the CSV file from the crawl.
- Parse each app from the CSV.
- Store the data from the parse.
- Concurrently run steps 2 and 3 on multiple rows simultaneously.
- Proxy Integration will once again get us past anti-bots.
Understanding How To Scrape Google Play
Before we build this project, we need to get a better understanding of Google Play at a high level. Google Play has results pages and it also has individual app pages.
In the coming sections, we're going to take a look at these pages in more detail and find exactly the information we'll be extracting.
Step 1: How To Request Google Play Pages
As mentioned above, there are two types of pages we need to get. We fetch these pages with a GET request. As we take a look at these URLs, we'll be able to reconstruct them from inside our scraper. If you take a look at the image below, you'll see results for the term "crypto wallet".
Here is our results page. As you can see, our URL is constructed like this:
https://play.google.com/store/search?q={keyword}&c=apps
Here is an individual app page. You can see the URL in the address bar, but luckily, we don't need to reconstruct this one. Our app URLs will be extracted during our search.
Step 2: How To Extract Data From Google Play Results and Pages
Now let's look at the HTML data we're going to be extracting. We'll start with the search results page.
As you can see below, item on the page contains a role
of listitem
. When we search for this item, we'll be using the CSS selector div[role='listitem']
. From there, we can pull all of the data we need.
Now, we'll look at how we're going to extract review data. All of our reviews are embedded within a div
container with a data-g-id
of reviews
. From within this container, we're going to do through and pull all of our reviews.
Step 3: Geolocated Data
To handle geolocation, we'll be using the ScrapeOps Proxy API. The ScrapeOps Proxy Aggregator allows us to pass in a country
param.
When we choose a country
, ScrapeOps will route us through a server within that country.
- If we want to appear in the US, we'll pass
"country": "us"
. - We can also pass another parameter,
residential
. This one is a boolean. - If we set
"residential": True
, ScrapeOps will assign us a residential IP address which exponentially decreases our likelihood of getting blocked.
Setting Up Our Google Play Scraper Project
Let's get started. You can run the following commands to get setup.
Create a New Project Folder
mkdir google-play-scraper
cd google-play-scraper
Create a New Virtual Environment
python -m venv venv
Activate the Environment
source venv/bin/activate
Install Our Dependencies
pip install requests
pip install beautifulsoup4
Build A Google Play Search Crawler
It's finally time to get started. We'll start off by building a crawler. We'll add a parser, and then we'll continue building on top of it from there. We'll add the following in order.
- Parsing
- Data Storage
- Concurrency
- Proxy Integration
Step 1: Create Simple Search Data Parser
We'll get started by creating an initial parser. The code we create here will give us the basic structure to build off of for the rest of the project. We'll add retries, error handling and our initial parsing function.
While the basic structure is important, we you should really pay attention to here is the parsing function, scrape_search_results()
.
Here is our starter script.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")
Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")
name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text
search_data = {
"name": name,
"stars": stars,
"url": link,
"publisher": publisher
}
print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, location, retries=3):
for keyword in keywords:
scrape_search_results(keyword, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet"]
aggregate_files = []
## Job Processes
filename = "report.csv"
start_scrape(keyword_list, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")
In scrape_search_results()
, while
the operation hasn't succeeded, we do the following:
div_cards = soup.select("div[role='listitem']")
finds alldiv
tags with therole
,listitem
.- We use an array of excluded words to filter out unwanted
div
cards. info_rows = div_card.select("div div span")
finds all of the rows inside each review card.- We then pull the
name
,publisher
andrating
from ourinfo_rows
. - We also pull the
href
element withhref = div_card.find("a").get("href")
and use some basic string formatting to reconstruct the full link.
Step 2: Storing the Scraped Data
After parsing data, we need to store it. Without storage, our parsing function is pretty much useless. When we store our data, we can review that CSV file later. Not only can we review the file ourselves, our app scraper will be able to look up each app from the CSV.
For proper storage, we're going to need to create a couple different classes, SearchData
and DataPipeline
.
Here is SearchData
, we'll use it to hold data for individual search items.
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
Here is our DataPipeline
. The DataPipeline
will be used to open a pipeline to a CSV file. This pipeline takes in dataclass
objects and pipes them to the CSV file while removing duplicate ones.
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
In our full code example below, we open a DataPipeline
and pass it into start_scrape()
which in turn passes it into scrape_search_results()
. From within scrape_search_results()
instead of printing our data to the terminal, we use it to create a SearchData
object. That object then gets passed into our DataPipeline
.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")
Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")
name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text
search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, location, data_pipeline=None, retries=3):
for keyword in keywords:
scrape_search_results(keyword, location, data_pipeline=data_pipeline, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet"]
aggregate_files = []
## Job Processes
filename = "report.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
crawl_pipeline = DataPipeline(csv_filename=filename)
creates aDataPipeline
.- From within
scrape_search_results()
, we turn our parsed data into aSearchData
object and pass it into the pipeline. - After we've completed the crawl, we close the pipeline with
crawl_pipeline.close_pipeline()
.
Step 3: Adding Concurrency
The next portion of our project is to add concurrency. At the moment, we use a for
loop to iterate through our keyword_list
. In this section, we're going to replace that for
loop with ThreadPoolExecutor
which gives us the power of multithreading.
Here is our refactored start_scrape()
function.
def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
All of the real logic is happening inside of executor.map()
:
scrape_search_results
is the function we'd like to call on our available threads.keywords
is an array of keywords we want to search.- All other arguments get passed in as arrays and subsequently get passed into
scrape_search_results
on each call.
Here is our fully updated Python script.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")
Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")
name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text
search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet"]
aggregate_files = []
## Job Processes
filename = "report.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
- We replaced the
for
loop fromstart_scrape()
withThreadPoolExecutor
. - Our first argument to
executor.map()
is the function we want to call. - All other arguments get passed in as arrays to get passed into the first function.
Step 4: Bypassing Anti-Bots
Time to unleash the power of proxy. The ScrapeOps Proxy API allows us to be routed through servers and appear as if we're in a different location. We pass our URL and a few other arguments into this function and it uses string formatting to give us a proxied URL.
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
"api_key"
: holds our ScrapeOps API key."url"
: is the url that we'd like to scrape."country"
: is the country we'd like to appear in."wait"
: is how long we want the ScrapeOps server to wait before sending our response back."residential"
: is a boolean that lets ScrapeOps know if we want a residential IP. If we set it toTrue
, we get a residnetial IP instead of a datacenter IP address. This greatly decreases our likelihood of getting blocked.
Once our crawler is all put together, it looks like this.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")
Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")
name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text
search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet"]
aggregate_files = []
## Job Processes
filename = "report.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
Our crawler is now ready for production testing.
Step 6: Production Run
Time for the production run. I'm going to add another keyword to our list in the main
. Aside from that, everything else will stay the same.
Take a look below.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet", "bitcoin wallet"]
aggregate_files = []
## Job Processes
filename = "report.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
Feel free to change any of the following to tweak your results:
MAX_THREADS
MAX_RETRIES
LOCATION
keyword_list
Here are our results from two searches.
We generated a CSV file with 23 results in 18.165 seconds. This comes out to roughly 9 seconds per search. Considering that the ScrapeOps server is waiting for 5 of those 9 seconds, this isn't bad.
Build A Google Play Scraper
Now, we're going to build an app scraper. This scraper will read our CSV file and then lookup and parse each app from the file. Then it will create a new file for each app containing reviews for that app.
Here are the steps we'll go through.
- Create a parsing function.
- Load the urls to scrape.
- Storing the newly parsed data.
- Adding concurrency to the scraper.
- Integration with the ScrapeOps proxy.
Step 1: Create Simple Business Data Parser
Our basic parser is pretty similar to our first one. We have some basic error handling, retries and our intitial parsing logic. Just like earlier, the parsing logic is where you really need to pay attention.
Here is our parsing function.
def process_app(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_container = soup.select_one("div[data-g-id='reviews']")
review_headers = review_container.find_all("header")
for review in review_headers:
stars = len(review.find_all("svg"))
card = review.parent
divs = card.select("div div div div div")
name = divs[1].text
date = divs[10].text
description = divs[12].text
review_data = {
"name": name,
"date": date,
"stars": stars,
"description": description
}
print(review_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
- First, we find our
review_container
withsoup.select_one("div[data-g-id='reviews']")
. - Next we find a list of
header
elements for each review,review_container.find_all("header")
. - We then iterate through the
review_headers
. - On each
header
, we pull the following information:stars = len(review.find_all("svg"))
- A list of super nested
divs
,card.select("div div div div div")
- We pull the
name
,date
anddescription
from the list ofdivs
.
Step 2: Loading URLs To Scrape
Now, we need to read our CSV file. Our scraper is going to read the rows of the CSV file and then pass them into the parsing function we just created. Let's make a new function, kind of similar to start_scrape()
. We'll call this one process_results()
.
Here is process_results()
. Later on, we'll replace the for
loop with multithreading like we did before.
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_app(row, location, retries=retries)
After putting everything together, this is how our code looks.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")
Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")
name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text
search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
def process_app(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_container = soup.select_one("div[data-g-id='reviews']")
review_headers = review_container.find_all("header")
for review in review_headers:
stars = len(review.find_all("svg"))
card = review.parent
divs = card.select("div div div div div")
name = divs[1].text
date = divs[10].text
description = divs[12].text
review_data = {
"name": name,
"date": date,
"stars": stars,
"description": description
}
print(review_data)
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_app(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet"]
aggregate_files = []
## Job Processes
filename = "report.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
logger.info("Starting scrape...")
process_results(filename, LOCATION, retries=MAX_RETRIES)
logger.info("Scrape Complete")
Step 3: Storing the Scraped Data
To add data storage, we'll need to add another dataclass
. We'll call this one ReviewData
. It will hold the following traits:
name
date
stars
description
@dataclass
class ReviewData:
name: str = ""
date: str = ""
stars: int = 0
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
After creating this class, we can go ahead and pass it into another DataPipeline
. You can see this in our fully updated code below.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
stars: int = 0
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")
Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")
name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text
search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
def process_app(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_container = soup.select_one("div[data-g-id='reviews']")
review_headers = review_container.find_all("header")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review in review_headers:
stars = len(review.find_all("svg"))
card = review.parent
divs = card.select("div div div div div")
name = divs[1].text
date = divs[10].text
description = divs[12].text
review_data = ReviewData(
name=name,
date=date,
stars=stars,
description=description
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
for row in reader:
process_app(row, location, retries=retries)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet"]
aggregate_files = []
## Job Processes
filename = "report.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
logger.info("Starting scrape...")
process_results(filename, LOCATION, retries=MAX_RETRIES)
logger.info("Scrape Complete")
ReviewData
represents an individual review in our software.DataPipeline
saves ourReviewData
to a CSV file.
Step 4: Adding Concurrency
When we add concurrency, we'll do the same thing we did before. We're going to replace our for
loop with ThreadPoolExecutor
. Here is our refactored process_results()
function.
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_app,
reader,
[location] * len(reader),
[retries] * len(reader)
)
process_app
is the function we want to call on multiple threads this time.reader
is the array of rows from our CSV file.location
andretries
also get passed in as arrays, just like before.
Step 5: Bypassing Anti-Bots
To bypass anti-bots, we just need to change one line. get_scrapeops_url()
was already defined earlier. Now, we just need to use it again. We'll change one line of our parsing function to unleash the proxy.
response = requests.get(get_scrapeops_url(url, location=location))
Here is our production ready code.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict
API_KEY = ""
with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]
def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
@dataclass
class ReviewData:
name: str = ""
date: str = ""
stars: int = 0
description: str = ""
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False
while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data
soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")
Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")
name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text
search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)
data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True
except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)
def process_app(row, location, retries=3):
url = row["url"]
tries = 0
success = False
while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
review_container = soup.select_one("div[data-g-id='reviews']")
review_headers = review_container.find_all("header")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review in review_headers:
stars = len(review.find_all("svg"))
card = review.parent
divs = card.select("div div div div div")
name = divs[1].text
date = divs[10].text
description = divs[12].text
review_data = ReviewData(
name=name,
date=date,
stars=stars,
description=description
)
review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True
else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_app,
reader,
[location] * len(reader),
[retries] * len(reader)
)
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet", "web3 wallet"]
aggregate_files = []
## Job Processes
filename = "report.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
logger.info("Starting scrape...")
process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Scrape Complete")
Step 6: Production Run
Like before, we're ready to run in production run. I changed "bitcoin wallet"
to "web3 wallet"
. Otherwise, everything else is the same. Go ahead and take a look at our main
.
if __name__ == "__main__":
MAX_RETRIES = 3
MAX_THREADS = 5
LOCATION = "us"
logger.info(f"Crawl starting...")
## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet", "web3 wallet"]
aggregate_files = []
## Job Processes
filename = "report.csv"
crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
logger.info("Starting scrape...")
process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Scrape Complete")
Just like before, feel free to change the constants to alter your results. Here are our results.
Our entire operation finished in 113.846 seconds and generated the crawl generated a report with 21 apps. Our crawl earlier took 18.165 seconds. 113.846 - 18.165 = 95.681 seconds. 95.681 seconds / 21 apps = 4.556 seconds per app. This is about twice as fast as our crawl!
Legal and Ethical Considerations
When you access Google Play, you need to follow their terms of service and respect their robots.txt.
Violating these terms can result in suspension or even permanent removal of your account.
Their terms of service are available here. You can view their robots.txt
here.
Public data is generally alright to scrape. When data is public (not gated behind a login), it is public knowledge and public property.
When accessing data behind a login, you are accessing private data and therefore subject to their terms.
If you don't know if your scraper is legal, you need to consult an attorney.
Conclusion
You've made it to the end! You now know how to build a Google Play results crawler and a Google Play app scraper. You know how to add parsing, data storage, concurrency and proxy integration to both of these.
You should also have a decent understanding of Python Requests and BeautifulSoup. Take these new skills and go build something! Track the stats we scraped here and plan out a successful Play Store app.
To learn more about the tech stack we used to write this article, take a look at the links below!
More Python Web Scraping Guides
At ScrapeOps, not only do we have a great proxy API, we also have a ton of learning resources and they're all available to you for free!
If you want to learn more about Python web scraping in general, we wrote the Python Web Scraping Playbook on it!
If you'd learn more from our "How To Scrape" series, check out the articles listed below.