Skip to main content

Scrape Google Play With Python Requests and BeautifulSoup

How to Scrape Google Play With Requests and BeautifulSoup

Google Play is the largest app store in the world. The vast majority of all apps used (computer or smartphone) are available for download via the Play Store. When we scrape Google Play, we can collect all sorts of data such as ratings, reviews, publishers and more.

In this tutorial, we're going to build a search crawler and an app scraper to get data from Google Play.


TLDR - How to Scrape Google Play

Need to scrape Google Play? Don't have time to read? Use the prebuilt scraper we have right here.

  1. First, you'll need to create a new folder with a config.json file inside.
  2. Inside the config file, add your ScrapeOps API key: {"api_key": "your-super-secret-api-key}.
  3. Then, copy and paste the code below into a new python file. You can run the file with python name_of_your_file.py.
import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
date: str = ""
stars: int = 0
description: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")

Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")

name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text

search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)


def process_app(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_container = soup.select_one("div[data-g-id='reviews']")
review_headers = review_container.find_all("header")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review in review_headers:
stars = len(review.find_all("svg"))
card = review.parent

divs = card.select("div div div div div")

name = divs[1].text
date = divs[10].text
description = divs[12].text

review_data = ReviewData(
name=name,
date=date,
stars=stars,
description=description
)

review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_app,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet", "web3 wallet"]
aggregate_files = []

## Job Processes
filename = "report.csv"

crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

logger.info("Starting scrape...")
process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Scrape Complete")

This code will generate a search report, report.csv. Then it will generate an individual report for each app that we scraped and saved in report.csv. Feel free to adjust any of the following constants to fit your scraping project.

  • MAX_RETRIES: Sets the maximum number of retry attempts the script will make if a request fails.
  • MAX_THREADS: Sets the maximum number of threads (or concurrent tasks) that the script will use when scraping data.
  • LOCATION: Specifies the country code for the location from which you want to simulate the scraping requests.
  • keyword_list: A list of keywords or phrases that the script will use to search for listings on the store.

How To Architect Our Google Play Scraper

Our scraper project will consist of two separate scrapers, a result crawler and an app scraper.

  • Our crawler will perform a search, then parse and store the results.
  • Our scraper will be looking up an app, parsing and storing reviews about that app.

Here is the process for our crawler:

  1. Parse information from a search page.
  2. Store the parsed results from the search.
  3. Concurrently execute steps one and two for multiple searches.
  4. Proxy integration will help us avoid anti-bots and roadblocks.

Here is the process for our scraper:

  1. Read the CSV file from the crawl.
  2. Parse each app from the CSV.
  3. Store the data from the parse.
  4. Concurrently run steps 2 and 3 on multiple rows simultaneously.
  5. Proxy Integration will once again get us past anti-bots.

Understanding How To Scrape Google Play

Before we build this project, we need to get a better understanding of Google Play at a high level. Google Play has results pages and it also has individual app pages.

In the coming sections, we're going to take a look at these pages in more detail and find exactly the information we'll be extracting.


Step 1: How To Request Google Play Pages

As mentioned above, there are two types of pages we need to get. We fetch these pages with a GET request. As we take a look at these URLs, we'll be able to reconstruct them from inside our scraper. If you take a look at the image below, you'll see results for the term "crypto wallet".

Here is our results page. As you can see, our URL is constructed like this:

https://play.google.com/store/search?q={keyword}&c=apps

Play Store Search Results

Here is an individual app page. You can see the URL in the address bar, but luckily, we don't need to reconstruct this one. Our app URLs will be extracted during our search.

Play Store App Page


Step 2: How To Extract Data From Google Play Results and Pages

Now let's look at the HTML data we're going to be extracting. We'll start with the search results page.

As you can see below, item on the page contains a role of listitem. When we search for this item, we'll be using the CSS selector div[role='listitem']. From there, we can pull all of the data we need.

Play Store Search Results HTML Inspection

Now, we'll look at how we're going to extract review data. All of our reviews are embedded within a div container with a data-g-id of reviews. From within this container, we're going to do through and pull all of our reviews.

Play Store App Page HTML Inspection


Step 3: Geolocated Data

To handle geolocation, we'll be using the ScrapeOps Proxy API. The ScrapeOps Proxy Aggregator allows us to pass in a country param.

When we choose a country, ScrapeOps will route us through a server within that country.

  • If we want to appear in the US, we'll pass "country": "us".
  • We can also pass another parameter, residential. This one is a boolean.
  • If we set "residential": True, ScrapeOps will assign us a residential IP address which exponentially decreases our likelihood of getting blocked.

Setting Up Our Google Play Scraper Project

Let's get started. You can run the following commands to get setup.

Create a New Project Folder

mkdir google-play-scraper

cd google-play-scraper

Create a New Virtual Environment

python -m venv venv

Activate the Environment

source venv/bin/activate

Install Our Dependencies

pip install requests
pip install beautifulsoup4

Build A Google Play Search Crawler

It's finally time to get started. We'll start off by building a crawler. We'll add a parser, and then we'll continue building on top of it from there. We'll add the following in order.

  1. Parsing
  2. Data Storage
  3. Concurrency
  4. Proxy Integration

Step 1: Create Simple Search Data Parser

We'll get started by creating an initial parser. The code we create here will give us the basic structure to build off of for the rest of the project. We'll add retries, error handling and our initial parsing function.

While the basic structure is important, we you should really pay attention to here is the parsing function, scrape_search_results().

Here is our starter script.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")

Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")

name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text

search_data = {
"name": name,
"stars": stars,
"url": link,
"publisher": publisher
}

print(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keywords, location, retries=3):
for keyword in keywords:
scrape_search_results(keyword, location, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet"]
aggregate_files = []

## Job Processes
filename = "report.csv"

start_scrape(keyword_list, LOCATION, retries=MAX_RETRIES)
logger.info(f"Crawl complete.")

In scrape_search_results(), while the operation hasn't succeeded, we do the following:

  • div_cards = soup.select("div[role='listitem']") finds all div tags with the role, listitem.
  • We use an array of excluded words to filter out unwanted div cards.
  • info_rows = div_card.select("div div span") finds all of the rows inside each review card.
  • We then pull the name, publisher and rating from our info_rows.
  • We also pull the href element with href = div_card.find("a").get("href") and use some basic string formatting to reconstruct the full link.

Step 2: Storing the Scraped Data

After parsing data, we need to store it. Without storage, our parsing function is pretty much useless. When we store our data, we can review that CSV file later. Not only can we review the file ourselves, our app scraper will be able to look up each app from the CSV.

For proper storage, we're going to need to create a couple different classes, SearchData and DataPipeline.

Here is SearchData, we'll use it to hold data for individual search items.

@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

Here is our DataPipeline. The DataPipeline will be used to open a pipeline to a CSV file. This pipeline takes in dataclass objects and pipes them to the CSV file while removing duplicate ones.

class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

In our full code example below, we open a DataPipeline and pass it into start_scrape() which in turn passes it into scrape_search_results(). From within scrape_search_results() instead of printing our data to the terminal, we use it to create a SearchData object. That object then gets passed into our DataPipeline.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")

Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")

name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text

search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")


def start_scrape(keywords, location, data_pipeline=None, retries=3):
for keyword in keywords:
scrape_search_results(keyword, location, data_pipeline=data_pipeline, retries=retries)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet"]
aggregate_files = []

## Job Processes
filename = "report.csv"

crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
  • crawl_pipeline = DataPipeline(csv_filename=filename) creates a DataPipeline.
  • From within scrape_search_results(), we turn our parsed data into a SearchData object and pass it into the pipeline.
  • After we've completed the crawl, we close the pipeline with crawl_pipeline.close_pipeline().

Step 3: Adding Concurrency

The next portion of our project is to add concurrency. At the moment, we use a for loop to iterate through our keyword_list. In this section, we're going to replace that for loop with ThreadPoolExecutor which gives us the power of multithreading.

Here is our refactored start_scrape() function.

def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)

All of the real logic is happening inside of executor.map():

  • scrape_search_results is the function we'd like to call on our available threads.
  • keywords is an array of keywords we want to search.
  • All other arguments get passed in as arrays and subsequently get passed into scrape_search_results on each call.

Here is our fully updated Python script.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False

while tries <= retries and not success:
try:
response = requests.get(url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")

Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")

name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text

search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet"]
aggregate_files = []

## Job Processes
filename = "report.csv"

crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")
  • We replaced the for loop from start_scrape() with ThreadPoolExecutor.
  • Our first argument to executor.map() is the function we want to call.
  • All other arguments get passed in as arrays to get passed into the first function.

Step 4: Bypassing Anti-Bots

Time to unleash the power of proxy. The ScrapeOps Proxy API allows us to be routed through servers and appear as if we're in a different location. We pass our URL and a few other arguments into this function and it uses string formatting to give us a proxied URL.

def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url
  • "api_key": holds our ScrapeOps API key.
  • "url": is the url that we'd like to scrape.
  • "country": is the country we'd like to appear in.
  • "wait": is how long we want the ScrapeOps server to wait before sending our response back.
  • "residential": is a boolean that lets ScrapeOps know if we want a residential IP. If we set it to True, we get a residnetial IP instead of a datacenter IP address. This greatly decreases our likelihood of getting blocked.

Once our crawler is all put together, it looks like this.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")

Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")

name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text

search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)


if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet"]
aggregate_files = []

## Job Processes
filename = "report.csv"

crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

Our crawler is now ready for production testing.


Step 6: Production Run

Time for the production run. I'm going to add another keyword to our list in the main. Aside from that, everything else will stay the same.

Take a look below.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet", "bitcoin wallet"]
aggregate_files = []

## Job Processes
filename = "report.csv"

crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

Feel free to change any of the following to tweak your results:

  • MAX_THREADS
  • MAX_RETRIES
  • LOCATION
  • keyword_list

Here are our results from two searches.

Crawler Performance Terminal

We generated a CSV file with 23 results in 18.165 seconds. This comes out to roughly 9 seconds per search. Considering that the ScrapeOps server is waiting for 5 of those 9 seconds, this isn't bad.


Build A Google Play Scraper

Now, we're going to build an app scraper. This scraper will read our CSV file and then lookup and parse each app from the file. Then it will create a new file for each app containing reviews for that app.

Here are the steps we'll go through.

  1. Create a parsing function.
  2. Load the urls to scrape.
  3. Storing the newly parsed data.
  4. Adding concurrency to the scraper.
  5. Integration with the ScrapeOps proxy.

Step 1: Create Simple Business Data Parser

Our basic parser is pretty similar to our first one. We have some basic error handling, retries and our intitial parsing logic. Just like earlier, the parsing logic is where you really need to pay attention.

Here is our parsing function.

def process_app(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_container = soup.select_one("div[data-g-id='reviews']")
review_headers = review_container.find_all("header")

for review in review_headers:
stars = len(review.find_all("svg"))
card = review.parent

divs = card.select("div div div div div")

name = divs[1].text
date = divs[10].text
description = divs[12].text

review_data = {
"name": name,
"date": date,
"stars": stars,
"description": description
}

print(review_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")
  • First, we find our review_container with soup.select_one("div[data-g-id='reviews']").
  • Next we find a list of header elements for each review, review_container.find_all("header").
  • We then iterate through the review_headers.
  • On each header, we pull the following information:
    • stars = len(review.find_all("svg"))
    • A list of super nested divs, card.select("div div div div div")
    • We pull the name, date and description from the list of divs.

Step 2: Loading URLs To Scrape

Now, we need to read our CSV file. Our scraper is going to read the rows of the CSV file and then pass them into the parsing function we just created. Let's make a new function, kind of similar to start_scrape(). We'll call this one process_results().

Here is process_results(). Later on, we'll replace the for loop with multithreading like we did before.

def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_app(row, location, retries=retries)

After putting everything together, this is how our code looks.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")

Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")

name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text

search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)


def process_app(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_container = soup.select_one("div[data-g-id='reviews']")
review_headers = review_container.find_all("header")

for review in review_headers:
stars = len(review.find_all("svg"))
card = review.parent

divs = card.select("div div div div div")

name = divs[1].text
date = divs[10].text
description = divs[12].text

review_data = {
"name": name,
"date": date,
"stars": stars,
"description": description
}

print(review_data)

success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_app(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet"]
aggregate_files = []

## Job Processes
filename = "report.csv"

crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

logger.info("Starting scrape...")
process_results(filename, LOCATION, retries=MAX_RETRIES)
logger.info("Scrape Complete")

Step 3: Storing the Scraped Data

To add data storage, we'll need to add another dataclass. We'll call this one ReviewData. It will hold the following traits:

  • name
  • date
  • stars
  • description
@dataclass
class ReviewData:
name: str = ""
date: str = ""
stars: int = 0
description: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

After creating this class, we can go ahead and pass it into another DataPipeline. You can see this in our fully updated code below.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
date: str = ""
stars: int = 0
description: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")

Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")

name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text

search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)


def process_app(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(url)
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_container = soup.select_one("div[data-g-id='reviews']")
review_headers = review_container.find_all("header")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review in review_headers:
stars = len(review.find_all("svg"))
card = review.parent

divs = card.select("div div div div div")

name = divs[1].text
date = divs[10].text
description = divs[12].text

review_data = ReviewData(
name=name,
date=date,
stars=stars,
description=description
)

review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

for row in reader:
process_app(row, location, retries=retries)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet"]
aggregate_files = []

## Job Processes
filename = "report.csv"

crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

logger.info("Starting scrape...")
process_results(filename, LOCATION, retries=MAX_RETRIES)
logger.info("Scrape Complete")
  • ReviewData represents an individual review in our software.
  • DataPipeline saves our ReviewData to a CSV file.

Step 4: Adding Concurrency

When we add concurrency, we'll do the same thing we did before. We're going to replace our for loop with ThreadPoolExecutor. Here is our refactored process_results() function.

def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_app,
reader,
[location] * len(reader),
[retries] * len(reader)
)
  • process_app is the function we want to call on multiple threads this time.
  • reader is the array of rows from our CSV file.
  • location and retries also get passed in as arrays, just like before.

Step 5: Bypassing Anti-Bots

To bypass anti-bots, we just need to change one line. get_scrapeops_url() was already defined earlier. Now, we just need to use it again. We'll change one line of our parsing function to unleash the proxy.

response = requests.get(get_scrapeops_url(url, location=location))

Here is our production ready code.

import os
import csv
import requests
import json
import logging
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import concurrent.futures
from dataclasses import dataclass, field, fields, asdict

API_KEY = ""

with open("config.json", "r") as config_file:
config = json.load(config_file)
API_KEY = config["api_key"]



def get_scrapeops_url(url, location="us"):
payload = {
"api_key": API_KEY,
"url": url,
"country": location,
"wait": 5000,
"residential": True
}
proxy_url = "https://proxy.scrapeops.io/v1/?" + urlencode(payload)
return proxy_url


## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



@dataclass
class SearchData:
name: str = ""
stars: float = 0
url: str = ""
publisher: str = ""

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

@dataclass
class ReviewData:
name: str = ""
date: str = ""
stars: int = 0
description: str = ""


def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
# Check string fields
if isinstance(getattr(self, field.name), str):
# If empty set default text
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
# Strip any trailing spaces, etc.
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:

def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="utf-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)

if not file_exists:
writer.writeheader()

for item in data_to_save:
writer.writerow(asdict(item))

self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped.")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()



def scrape_search_results(keyword, location, data_pipeline=None, retries=3):
formatted_keyword = keyword.replace(" ", "+")
url = f"https://play.google.com/store/search?q={formatted_keyword}&c=apps"
tries = 0
success = False

while tries <= retries and not success:
try:
scrapeops_proxy_url = get_scrapeops_url(url, location=location)
response = requests.get(scrapeops_proxy_url)
logger.info(f"Recieved [{response.status_code}] from: {url}")
if response.status_code != 200:
raise Exception(f"Failed request, Status Code {response.status_code}")
## Extract Data

soup = BeautifulSoup(response.text, "html.parser")
div_cards = soup.select("div[role='listitem']")

Excluded_words = ["Apps & games", "Movies & TV", "Books"]
for div_card in div_cards:
if div_card.text in Excluded_words:
continue
info_rows = div_card.select("div div span")

name = info_rows[1].text
publisher = info_rows[2].text
href = div_card.find("a").get("href")
link = f"https://play.google.com{href}"
rating = 0.0
if info_rows[3].text != None:
rating = info_rows[3].text

search_data = SearchData(
name=name,
stars=rating,
url=link,
publisher=publisher
)

data_pipeline.add_data(search_data)
logger.info(f"Successfully parsed data from: {url}")
success = True

except Exception as e:
logger.error(f"An error occurred while processing page {url}: {e}")
logger.info(f"Retrying request for page: {url}, retries left {retries-tries}")
if not success:
raise Exception(f"Max Retries exceeded: {retries}")




def start_scrape(keywords, location, data_pipeline=None, max_threads=5, retries=3):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
scrape_search_results,
keywords,
[location] * len(keywords),
[data_pipeline] * len(keywords),
[retries] * len(keywords)
)


def process_app(row, location, retries=3):
url = row["url"]
tries = 0
success = False

while tries <= retries and not success:
response = requests.get(get_scrapeops_url(url, location=location))
try:
if response.status_code == 200:
logger.info(f"Status: {response.status_code}")

soup = BeautifulSoup(response.text, "html.parser")
review_container = soup.select_one("div[data-g-id='reviews']")
review_headers = review_container.find_all("header")
review_pipeline = DataPipeline(csv_filename=f"{row['name'].replace(' ', '-')}.csv")
for review in review_headers:
stars = len(review.find_all("svg"))
card = review.parent

divs = card.select("div div div div div")

name = divs[1].text
date = divs[10].text
description = divs[12].text

review_data = ReviewData(
name=name,
date=date,
stars=stars,
description=description
)

review_pipeline.add_data(review_data)
review_pipeline.close_pipeline()
success = True

else:
logger.warning(f"Failed Response: {response.status_code}")
raise Exception(f"Failed Request, status code: {response.status_code}")
except Exception as e:
logger.error(f"Exception thrown: {e}")
logger.warning(f"Failed to process page: {row['url']}")
logger.warning(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max Retries exceeded: {retries}")
else:
logger.info(f"Successfully parsed: {row['url']}")


def process_results(csv_file, location, max_threads=5, retries=3):
logger.info(f"processing {csv_file}")
with open(csv_file, newline="") as file:
reader = list(csv.DictReader(file))

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(
process_app,
reader,
[location] * len(reader),
[retries] * len(reader)
)

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet", "web3 wallet"]
aggregate_files = []

## Job Processes
filename = "report.csv"

crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

logger.info("Starting scrape...")
process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Scrape Complete")

Step 6: Production Run

Like before, we're ready to run in production run. I changed "bitcoin wallet" to "web3 wallet". Otherwise, everything else is the same. Go ahead and take a look at our main.

if __name__ == "__main__":

MAX_RETRIES = 3
MAX_THREADS = 5

LOCATION = "us"

logger.info(f"Crawl starting...")

## INPUT ---> List of keywords to scrape
keyword_list = ["crypto wallet", "web3 wallet"]
aggregate_files = []

## Job Processes
filename = "report.csv"

crawl_pipeline = DataPipeline(csv_filename=filename)
start_scrape(keyword_list, LOCATION, data_pipeline=crawl_pipeline, max_threads=MAX_THREADS, retries=MAX_RETRIES)
crawl_pipeline.close_pipeline()
logger.info(f"Crawl complete.")

logger.info("Starting scrape...")
process_results(filename, LOCATION, max_threads=MAX_THREADS, retries=MAX_RETRIES)
logger.info("Scrape Complete")

Just like before, feel free to change the constants to alter your results. Here are our results.

Scraper Performance Terminal

Our entire operation finished in 113.846 seconds and generated the crawl generated a report with 21 apps. Our crawl earlier took 18.165 seconds. 113.846 - 18.165 = 95.681 seconds. 95.681 seconds / 21 apps = 4.556 seconds per app. This is about twice as fast as our crawl!


When you access Google Play, you need to follow their terms of service and respect their robots.txt. Violating these terms can result in suspension or even permanent removal of your account.

Their terms of service are available here. You can view their robots.txt here.

Public data is generally alright to scrape. When data is public (not gated behind a login), it is public knowledge and public property.

When accessing data behind a login, you are accessing private data and therefore subject to their terms.

If you don't know if your scraper is legal, you need to consult an attorney.


Conclusion

You've made it to the end! You now know how to build a Google Play results crawler and a Google Play app scraper. You know how to add parsing, data storage, concurrency and proxy integration to both of these.

You should also have a decent understanding of Python Requests and BeautifulSoup. Take these new skills and go build something! Track the stats we scraped here and plan out a successful Play Store app.

To learn more about the tech stack we used to write this article, take a look at the links below!


More Python Web Scraping Guides

At ScrapeOps, not only do we have a great proxy API, we also have a ton of learning resources and they're all available to you for free!

If you want to learn more about Python web scraping in general, we wrote the Python Web Scraping Playbook on it!

If you'd learn more from our "How To Scrape" series, check out the articles listed below.