Skip to main content

Scrape Google Search With Python Requests and BeautifulSoup

How to Scrape Google Search With Python Requests and BeautifulSoup

When scraping the web, Google is a goldmine. While you are probably already familiar with Google and its convenience, you are probably unfamiliar with its utility when it comes to scraping.

If you can scrape a search engine, you can essentially build your own data mining operation. If you can scrape a search engine, you can identify other sites to scrape. In a data driven world, this is an incredibly lucrative skill to have.

In this guide, we'll go over the following topics:


TLDR - How to Scrape Google Search With Python Requests

When scraping search results, pay attention to the following things:

  • We get uniform results nested inside of <div> elements.
  • Each result has its own <h3> tag
  • Each result comes with an href that links to a website
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs, urlencode
import csv
import concurrent
from concurrent.futures import ThreadPoolExecutor
import os
import logging
import time
from dataclasses import dataclass, field, fields, asdict
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
proxy_url = "https://proxy.scrapeops.io/v1/"
API_KEY = "YOUR-SUPER-SECRET-API-KEY"

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str
base_url: str
link: str
page: int
result_number: int

def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_open = False
def save_to_csv(self):
self.csv_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]

file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename)

with open(self.csv_filename, mode="a", encoding="UTF-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate Item Found: {input_data.name}. Item dropped")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def get_scrapeops_url(url):
payload = {'api_key': API_KEY, 'url': url, 'country': 'us'}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url

def search_page(query, page, location="United States", headers=headers, pipeline=None, num=100, retries=3):
url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}"
payload = {
"api_key": API_KEY,
"url": url,
}
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(get_scrapeops_url(url))
soup = BeautifulSoup(response.text, 'html.parser')
divs = soup.find_all("div")
index = 0
last_link = ""
for div in divs:
h3s = div.find_all("h3")
if len(h3s) > 0:
link = div.find("a", href=True)
parsed_url = urlparse(link["href"])
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
site_info = {'title': h3s[0].text, "base_url": base_url, 'link': link["href"], "page": page, "result_number": index}
search_data = SearchData(
name = site_info["title"],
base_url = site_info["base_url"],
link = site_info["link"],
page = site_info["page"],
result_number = site_info["result_number"]
)
if site_info["link"] != last_link:
index += 1
last_link = site_info["link"]
if pipeline:
pipeline.add_data(search_data)
success = True

except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
tries += 1
if not success:
print(f"Failed to scrape page {page}, no retries left")
raise Exception(f"Max retries exceeded: {retries}")
else:
print(f"Scraped page {page} with {retries-tries} retries left")

def full_search(query, pages=3, location="us", MAX_THREADS=5, MAX_RETRIES=3, num=10):
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
pipeline = DataPipeline(csv_filename=f"{query.replace(' ', '-')}.csv")
tasks = [executor.submit(search_page, query, page, location, None, pipeline, num, MAX_RETRIES) for page in range(pages)]
for future in tasks:
future.result()
pipeline.close_pipeline()

if __name__ == "__main__":
MAX_THREADS = 5
MAX_RETRIES = 5
queries = ["cool stuff"]


logger.info("Starting full search...")
for query in queries:
full_search(query, pages=3, num=10)
logger.info("Search complete.")


When running this code simply change the search_results line:

full_search(query, pages=20)

If you'd like to search for 100 pages of boring stuff, simply change it to:

full_search(query, pages=100)

And add boring stuff to your QUERIES array: ["cool stuff", "boring stuff"]

Other things you can tweak are:

  • location
  • MAX_THREADS
  • MAX_RETRIES
  • num

If you'd like to run this code, simply copy and paste it into a Python script yourscript.py. Obviously you can name it whatever you want. Once you have your script and dependencies installed, run the following command:

python yourscript.py

This will output a CSV file containing your search results.


How To Architect Our Google Scraper

There are many different tools we can use to scrape Google Search. In this article, we'll focus on Requests and BeautifulSoup. Our scraper needs to be able to do the following:

  1. Perform a get request for a Google Search
  2. Interpret the results
  3. Save the number of each result, its url, and site description

For starters, we need to build a simple parser to pull the nested information from all of Google's nested divs. Once we have parser, we'll add pagination. Next, we'll add data processing and concurrency. Afterward, we'll add a proxy. Once we're ready for the production run, we'll clean everything up a bit more and improve our data processing as well.

Remember, our scraper needs:

  • Parsing
  • Pagination
  • Data Processing
  • Concurrency
  • Proxy

Before plunging head first into code, we're going to talk about how our scraper works on a high level. In this section, we're going over the required steps in greater detail. If you've got some experience in web scraping already, feel free to skip this section.


Step 1: How To Request Google Search Pages

Let's start by taking a look at Google Search results manually. The screenshot below is the result of searching the term "cool stuff".

Google Search Results Page for Cool Stuff

As you can see in the address bar, we send the following GET request:

https://www.google.com/search?q=cool+stuff

The actual domain we're pinging is https://www.google.com/search. At the end, you should notice ?q=cool+stuff.

In the address bar, a question mark, ? denotes a query (in this case we're querying q) and the variable for our query is denoted by an equal operator, =.

So, ?q=cool+stuff means that our search query is for cool stuff. If we wanted to search for boring stuff, we could instead use ?q=boring+stuff.

In the days of old, at the bottom of our page, we would see a list of page numbers. This made search results incredibly easy to scrape.

While Google doesn't exactly give us page numbers anymore, they do give us a start query that we can use in order to paginate our results. We get our results in batches of 10. With variables figured in, our url will look like this:

https://www.google.com/search?q={query}&start={page * 10}

They also give us a num query that we can use to control the number of results that we get. Taking num into account, our url would look more like this:

https://www.google.com/search?q={query}&start={page * num}&num={num}"

We can set num up to 100 results, but Google's response doesn't always give us these results when we request them. Multiple times throughout the writing of this article, I've used num=100 and been blocked or gotten smaller results. Other times I have gotten proper results.


As you saw in the screenshot earlier, all of our results come with an <h3> tag. To find our results, we can simply use BeautifulSoup's .find_all() method. Some websites like to nest a bunch of different things inside of an element and Google is no exception.

Here is the full HTML of our first result: <h3 class="LC20lb MBeuO DKV0Md">Cool Stuff</h3>. As you can see, the class name is a bunch of jumbled garbage and there is no link within the tag! This is because Google (like many other sites) nests all of our important information within a <div>.

If the class name of each result was more legible and not subject to change, I would recommend using this as a way to parse the result. Since the class name is likely to change over time, we're simply going to get all of the <div> elements and find the <h3> elements nested inside of them. We'll use soup.find_all() and we'll use a last_link variable.

For each result we get, we'll compair its link to the last link. If the current link is the same as the last link, we'll ignore this element and move on to the next one.


Step 3: How To Control Pagination

As we touched on briefly earlier, Google no longer gives a real page numbers. What they do give us is a result number. Our results tend to come in batches of 10, so it's quite easy to parse our data in a page like fashion.

Take a closer look at our url format again:

https://www.google.com/search?q={query}&start={page * 10}

We simply multiply our page by 10. Using this method, we'll fetch results 1 through 10, then 20 through 30, and so on and so forth. In testing, Google has occasionally given us up to 12 results but that's ok. Even if we get duplicates, we can remove them when we handle the data.


Step 4: Geolocated Data

Some websites will return different results depending on our location. Once again, Google is no exception. To add a location, we can simply add the geo_location parameter to our request.

At the moment, our full request looks like this:

https://www.google.com/search?q={query}&start={page * 10}

Here is what it looks like with our location added:

https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}

All in all, it's a pretty simple change.


Setting Up Our Google Scraper Project

Now that you've got a basic understanding of the process, it's time to begin setting up our project. We'll start by creating a new project folder. We can call it google-search-requests.

You can create a new folder through your file explorer or enter the following command:

mkdir google-search-requests

Next, we need to create a virtual environment. I'll be using Python3.10-venv.

First, we'll create a new virtual environment:

Linux/Mac

python3 -m venv google-search

Windows

python -m venv google-search

One we've got our new environment created, let's activate it:

Linux/Mac

source google-search/bin/activate

Windows

.\google-search\Scripts\Activate.ps1

Once your venv is activated, it's time to install individual dependencies. This command will install both requests and beautifulsoup4.

pip install requests beautifulsoup4

Step 1: Create Simple Search Data Parser

We've already been through our base logic. Let's create an intitial script that we can build from.

import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs, urlencode

#search a single page
def google_search(query, retries=3):
tries = 0
#runtime loop for the scrape
while tries <= retries:
try:
url = f"https://www.google.com/search?q={query}"
response = requests.get(url)
results = []
last_link = ""
soup = BeautifulSoup(response.text, 'html.parser')
index = 0
for result in soup.find_all('div'):
title = result.find('h3')
if title:
title = title.text
else:
continue
base_url = ""
link = result.find('a', href=True)
if link:
link = link['href']
parsed_url = urlparse(link)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
continue
#this is the full site info we wish to extract
site_info = {'title': title, "base_url": base_url, 'link': link, "result_number": index}
if last_link != site_info["link"]:
results.append(result)
#return our list of results
print(f"Finished scrape with {tries} retries")
return results
except:
print("Failed to scrape the page")
print("Retries left:", retries-tries)
tries += 1
#if this line executes, the scrape has failed
raise Exception(f"Max retries exceeded: {retries}")


if __name__ == "__main__":

MAX_RETRIES = 5
QUERIES = ["cool stuff"]

for query in QUERIES:
results = google_search("cool stuff", retries=MAX_RETRIES)
for result in results:
print(result)

In this example, we:

  • Create a google_search() function that takes our query as a parameter
  • When we get the result, BeautifulSoup(response.text, 'html.parser') creates a BeautifulSoup instance to parse through the HTML
  • soup.find_all("div") finds all the <div> objects
  • result.find("h3") is used to find the header element of each result
  • link = result.find('a', href=True) extracts the link from the result
  • urlparse(link) parses our link
  • base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" reconstructs the base_url so we can save it
  • We then create a dict, site_info from the data we've extracted
  • If the link from site_info is different than last_link, we add our result to the results list
  • After parsing through the response and creating our list, we return the results list

Step 2: Add Pagination

We've already been through our base logic. Let's create an intitial script that we can build from.

import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs

def google_search(query, pages=3, location="United States", retries=3):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
results = []
last_link = ""
for page in range(0, pages):
tries = 0

while tries <= retries:

try:
url = f"https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}"
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, 'html.parser')
index = 0
for result in soup.find_all('div'):
title = result.find('h3')
if title:
title = title.text
else:
continue
base_url = ""
#pull the raw link from the result
link = result.find('a', href=True)
if link:
link = link['href']
parsed_url = urlparse(link)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
continue
#this is the full site info we wish to extract
site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index}
#if the link is different from the last link
if last_link != site_info["link"]:
results.append(site_info)
index += 1
last_link = link
print(f"Scraped page {page} with {retries} retries left")
return results

except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")

raise Exception(f"Max retries exceeded: {retries}")

if __name__ == "__main__":

MAX_RETRIES = 5
QUERIES = ["cool stuff"]

for query in QUERIES:
results = google_search("cool stuff", retries=MAX_RETRIES)
for result in results:
print(result)

There's alot going on in the code above. While it's not much different from our initial prototype, there is one really important part to pay attention to here, &start={page * 10}. This is the basis for how we try to batch our results. We also add in functionality for our geo_location, but by the time we add our proxy, this functionality is actually going to be moved elsewhere in our code.


Step 3: Storing the Scraped Data

As you probably noticed in our earlier examples, we store each object as a dict with key-value pairs. Here is the first result so you can see how the data is laid out:

{'title': 'Cool Stuff', 'base_url': '://', 'link': '/search?sca_esv=3d5aec0ebbda9031&q=cool+stuff&uds=AMwkrPusHYa-Y5lqXPwpg8jJI99FKYz2zi9dec3bfM0lH-hil3eHKWSsmwBdtnNX2uzO7rvzH_UOAG-8W6q5RMgyj5EtPQRweAkj97b7yv-dxhFjVNmTpUmjIG8LX5BTVMn1i8RvhFDaroRDPKXSl9mGzRdmu5ujMGh35B6t9hZQe5OWf6qF9qyxdHJPailq0Was2Ti5R1Efg6G0TWkZl8Q0a4QgLEUcLEh8uM-Gr_AIA73YM8e13Y_Y5x_btmkZoDODrensXIErfUplY9wGJ9in8N6PV9WQjCg77wu2IOm5pmE8706LnWQ&udm=2&prmd=isvnmbtz&sa=X&ved=2ahUKEwi459DNvrOFAxXzh1YBHfFMDlsQtKgLegQIEhAB', 'page': 0, 'result_number': 0}

Each object has a title, base_url, link, page, and result_number. Because we have uniform data stored in key-value pairs, we already have the makings of a DataFrame and therefore a CSV.

To our imports, add the following line:

import csv

Then, update the script to look like this:

import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs
import csv
from os import path

def write_page_to_csv(filename, object_array):
path_to_csv = filename
file_exists = path.exists(filename)
with open(path_to_csv, mode="a", newline="", encoding="UTF-8") as file:
#name the headers after our object keys
writer = csv.DictWriter(file, fieldnames=object_array[0].keys())
if not file_exists:
writer.writeheader()
writer.writerows(object_array)

def google_search(query, pages=3, location="United States", retries=3):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
results = []
last_link = ""
for page in range(0, pages):
tries = 0
success = False

while tries <= retries and not success:

try:
url = f"https://www.google.com/search?q={query}&start={page * 10}"
response = requests.get(url, headers=headers)
print(f"Response Code: {response.status_code}")
soup = BeautifulSoup(response.text, 'html.parser')
index = 0
for result in soup.find_all('div'):
title = result.find('h3')
if title:
title = title.text
else:
continue
base_url = ""
#pull the raw link from the result
link = result.find('a', href=True)
if link:
link = link['href']
parsed_url = urlparse(link)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
continue
#this is the full site info we wish to extract
site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index}
#if the link is different from the last link
if last_link != site_info["link"]:
results.append(site_info)
index += 1
last_link = link
print(f"Scraped page {page} with {retries} retries left")
write_page_to_csv(f"{query}.csv", results)
success = True

except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max retries exceeded: {retries}")

if __name__ == "__main__":

MAX_RETRIES = 5
QUERIES = ["cool stuff"]

for query in QUERIES:
google_search("cool stuff", retries=MAX_RETRIES)

Pay close attention to the write_page_to_csv() function:

def write_page_to_csv(filename, object_array):
path_to_csv = filename
file_exists = path.exists(filename)
with open(path_to_csv, mode="a", newline="", encoding="UTF-8") as file:
#name the headers after our object keys
writer = csv.DictWriter(file, fieldnames=object_array[0].keys())
if not file_exists:
writer.writeheader()
writer.writerows(object_array)
  • The function above takes in an object_array (in this case our page results) and writes it to our filename.
  • If the file doesn't already exist, we create it. If it doesn't exist, we simply open and append it.
  • This allows us to put multiple page results into the same file without corrupting it.
  • When scraping at scale, we need to be able to scrape multiple pages of results and put them into the same file... This is the whole purpose of scraping to begin with... collecting data!

Here is a screenshot of the resulting file:

Scraped Google Search Results in CSV

This is almost identical to our previous iteration, with some small differences at the end:

  • We do not print our results
  • We instead write each list of page results to a csv file

It's important to append the csv file as soon as we have our results. If our scraper crashes halfway through the job, we still get some data. It's also very important to open this file in append mode so we don't overwrite any important data that we've scraped previously.


Step 4: Adding Concurrency

Now that we've got a working model from start to finish, let's focus on performance! We're going to split our google_search() function into two separate functions, search_page() and full_search(). search_page() will search a single page and full_search() will create multiple threads that call search_page() concurrently.

Add the following import statement:

from concurrent.futures import ThreadPoolExecutor

Now we'll refactor our google_search() function into our search_page() function.

def search_page(query, page, location="United States", retries=3, num=100):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
results = []
last_link = ""
tries = 0
success = False
while tries <= retries and not success:
try:
url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}"
response = requests.get(url, headers=headers)
if response.status_code != 200:
print("Failed server response", response.status_code)
raise Exception("Failed server response!")
print(f"Response Code: {response.status_code}")
soup = BeautifulSoup(response.text, 'html.parser')
index = 0
for result in soup.find_all('div'):
title = result.find('h3')
if title:
title = title.text
else:
continue
base_url = ""
#pull the raw link from the result
link = result.find('a', href=True)
if link:
link = link['href']
parsed_url = urlparse(link)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
continue
#this is the full site info we wish to extract
site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index}
#if the link is different from the last link
if last_link != site_info["link"]:
results.append(site_info)
index += 1
last_link = link

write_page_to_csv(f"{query}.csv", results)
success = True

except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
tries += 1

if not success:
print(f"Failed to scrape page {page}, no retries left")
raise Exception(f"Max retries exceeded: {retries}")
else:
print(f"Scraped page {page} with {retries} retries left")

This function:

  • Removes the pages argument and replaces it with page
  • Instead of running a for loop and iterating through pages, we simply execute our parsing logic on the page we're searching

Next, we'll create a full_search() function:

def full_search(query, pages=3, location="United States", MAX_THREADS=5, MAX_RETRIES=4, num=100):
page_numbers = list(range(pages))
full_results = []
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
executor.map(search_page, [query]*pages, page_numbers, [location]*pages, [MAX_RETRIES], [num])

While it may look a bit intimidating, this function is actually rather simple.

  • It takes one real argument, query. Everything else is a kwarg used to tweak our settings.
  • The scariest looking portion of this code is executor.map(). As bizarre as it looks, it's actually pretty simple.
  • It takes search_page as the first argument and the rest of the args are just parameters that we wish to pass into search_page().

This function is super important, it allows us to use a single thread for each page. When doing this, we can scrape multiple pages at the same time.

At this point, our full scraper should look like this:

import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs
import csv
from os import path
from concurrent.futures import ThreadPoolExecutor


def write_page_to_csv(filename, object_array):
path_to_csv = filename
file_exists = path.exists(filename)
with open(path_to_csv, mode="a", newline="", encoding="UTF-8") as file:
#name the headers after our object keys
writer = csv.DictWriter(file, fieldnames=object_array[0].keys())
if not file_exists:
writer.writeheader()
writer.writerows(object_array)

def search_page(query, page, location="United States", retries=3, num=100):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
results = []
last_link = ""
tries = 0
success = False
while tries <= retries and not success:
try:
url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}"
response = requests.get(url, headers=headers)
if response.status_code != 200:
print("Failed server response", response.status_code)
raise Exception("Failed server response!")
print(f"Response Code: {response.status_code}")
soup = BeautifulSoup(response.text, 'html.parser')
index = 0
for result in soup.find_all('div'):
title = result.find('h3')
if title:
title = title.text
else:
continue
base_url = ""
#pull the raw link from the result
link = result.find('a', href=True)
if link:
link = link['href']
parsed_url = urlparse(link)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
continue
#this is the full site info we wish to extract
site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index}
#if the link is different from the last link
if last_link != site_info["link"]:
results.append(site_info)
index += 1
last_link = link

write_page_to_csv(f"{query}.csv", results)
success = True

except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
tries += 1
if not success:
print(f"Failed to scrape page {page}, no retries left")
raise Exception(f"Max retries exceeded: {retries}")
else:
print(f"Scraped page {page} with {retries} retries left")


def full_search(query, pages=3, location="United States", MAX_THREADS=5, MAX_RETRIES=4, num=100):
page_numbers = list(range(pages))
full_results = []
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
executor.map(search_page, [query]*pages, page_numbers, [location]*pages, [MAX_RETRIES], [num])

if __name__ == "__main__":

MAX_RETRIES = 5
QUERIES = ["cool stuff"]

for query in QUERIES:
full_search(query, pages=1)

If you run this code, you'll probably get output similar to the image below. The reason for this is actually really simple. Our scraper was already faster than a normal human, and we just sped it up even more! Google recognizes the fact that our scraper doesn't appear human at all, so they block us.

Retries & Concurrency


Step 5: Bypassing Anti-Bots

As you probably recall from the last section, now that our requests are coming in really fast, we get blocked. The ScrapeOps Proxy is perfect for addressing this issue.

With the ScrapeOps Proxy, we get rotating IP addresses and we also have a middleman server. Because we're communicating to a server in the middle, this also slows down the rate at which we make our request. In short, our requests are spaced apart, and each one comes from a different IP address. This makes it nearly impossible to identify and block our scraper.

In the code below, we create a simple function, get_scrapeops_url(). This is a really simply function that just performs some basic string formatting for us, but this is vital to our scraper. We now have the ability to convert any url into a proxied url with very minimal impact on our overall code. With this function, we can now run our Python script, without getting blocked!

import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs, urlencode
import csv
from os import path
from concurrent.futures import ThreadPoolExecutor

#our default user agent
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
proxy_url = "https://proxy.scrapeops.io/v1/"
API_KEY = "YOUR-SUPER-SECRET-API-KEY"


def get_scrapeops_url(url, location='us'):
payload = {'api_key': API_KEY, 'url': url, 'country': location}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url

def write_page_to_csv(filename, object_array):
path_to_csv = filename
file_exists = path.exists(filename)
with open(path_to_csv, mode="a", newline="", encoding="UTF-8") as file:
#name the headers after our object keys
writer = csv.DictWriter(file, fieldnames=object_array[0].keys())
if not file_exists:
writer.writeheader()
writer.writerows(object_array)

def search_page(query, page, location="United States", retries=3, num=100):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
results = []
last_link = ""
tries = 0
success = False
while tries <= retries and not success:
try:
url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}"
response = requests.get(get_scrapeops_url(url), headers=headers)
if response.status_code != 200:
print("Failed server response", response.status_code)
raise Exception("Failed server response!")
print(f"Response Code: {response.status_code}")
soup = BeautifulSoup(response.text, 'html.parser')
index = 0
for result in soup.find_all('div'):
title = result.find('h3')
if title:
title = title.text
else:
continue
base_url = ""
#pull the raw link from the result
link = result.find('a', href=True)
if link:
link = link['href']
parsed_url = urlparse(link)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
continue
#this is the full site info we wish to extract
site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index}
#if the link is different from the last link
if last_link != site_info["link"]:
results.append(site_info)
index += 1
last_link = link

write_page_to_csv(f"{query}.csv", results)
success = True

except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
tries += 1
if not success:
print(f"Failed to scrape page {page}, no retries left")
raise Exception(f"Max retries exceeded: {retries}")
else:
print(f"Scraped page {page} with {retries} retries left")


def full_search(query, pages=3, location="us", MAX_THREADS=5, MAX_RETRIES=4, num=100):
page_numbers = list(range(pages))
full_results = []
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
executor.map(search_page, [query]*pages, page_numbers, [location]*pages, [MAX_RETRIES]*pages, [num]*pages)

if __name__ == "__main__":

MAX_RETRIES = 5
RESULTS_PER_PAGE = 10
QUERIES = ["cool stuff"]

for query in QUERIES:
full_search(query, pages=3, num=RESULTS_PER_PAGE)

There are a couple of things you should pay attention to here:

  • We instead make our requests to proxy_url:
    • We use get_scrapeops_url() to convert regular urls into proxied ones
    • We no longer pass our location into a request to Google, we use it as a query param for ScrapeOps...ScrapeOps will take care of our location for us!

In production, you should always use a good proxy. When we use proxies, the site server (in this case Google), can't pin down our location because all of our requests are coming from all over the place!


Step 6: Production Run

Time for the production run. You can view the full production level scraper below. This version of the script actually completely removes the return value from the search_page() function and we use multithreading in the main block at the bottom of the script instead a full_search() function. We also added basic logging and file handling to prevent from overwriting results.

Take note of the following classes: SearchData and DataPipeline. SearchData is a more simple class that basically just holds the data we're choosing to scrape. DataPipeline is where the real heavy lifting gets done.

import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs, urlencode
import csv
import concurrent
from concurrent.futures import ThreadPoolExecutor
import os
import logging
import time
from dataclasses import dataclass, field, fields, asdict
#our default user agent
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
proxy_url = "https://proxy.scrapeops.io/v1/"
API_KEY = "YOUR-SUPER-SECRET-API-KEY"

## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str
base_url: str
link: str
page: int
result_number: int

def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())


class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_open = False
def save_to_csv(self):
self.csv_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return

keys = [field.name for field in fields(data_to_save[0])]

file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename)

with open(self.csv_filename, mode="a", encoding="UTF-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate Item Found: {input_data.name}. Item dropped")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def get_scrapeops_url(url):
payload = {'api_key': API_KEY, 'url': url, 'country': 'us'}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url

def search_page(query, page, location="United States", headers=headers, pipeline=None, num=100, retries=3):
url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}"
payload = {
"api_key": API_KEY,
"url": url,
}
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(get_scrapeops_url(url))
soup = BeautifulSoup(response.text, 'html.parser')
divs = soup.find_all("div")
index = 0
last_link = ""
for div in divs:
h3s = div.find_all("h3")
if len(h3s) > 0:
link = div.find("a", href=True)
parsed_url = urlparse(link["href"])
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
site_info = {'title': h3s[0].text, "base_url": base_url, 'link': link["href"], "page": page, "result_number": index}
search_data = SearchData(
name = site_info["title"],
base_url = site_info["base_url"],
link = site_info["link"],
page = site_info["page"],
result_number = site_info["result_number"]
)
if site_info["link"] != last_link:
index += 1
last_link = site_info["link"]
if pipeline:
pipeline.add_data(search_data)
success = True

except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
tries += 1
if not success:
print(f"Failed to scrape page {page}, no retries left")
raise Exception(f"Max retries exceeded: {retries}")
else:
print(f"Scraped page {page} with {retries-tries} retries left")

def full_search(query, pages=3, location="us", MAX_THREADS=5, MAX_RETRIES=3, num=10):
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
pipeline = DataPipeline(csv_filename=f"{query.replace(' ', '-')}.csv")
tasks = [executor.submit(search_page, query, page, location, None, pipeline, num, MAX_RETRIES) for page in range(pages)]
for future in tasks:
future.result()
pipeline.close_pipeline()

if __name__ == "__main__":
MAX_THREADS = 5
MAX_RETRIES = 5
queries = ["cool stuff"]


logger.info("Starting full search...")
for query in queries:
full_search(query, pages=3, num=10)
logger.info("Search complete.")


Remember:

  • SearchData is a class that simply holds our data
  • DataPipeline does all the heavy lifting of removing duplicates and writing the data to our csv file

Whenever you scrape the web, you need to follow the terms and conditions of the site you're scraping. Always consult the robots.txt file to see what they allow. Generally, if you are scraping as a guest (not logged in), the information is considered to be public and scraping is usually alright.

You can look at Google's robots.txt here. In addition, if you're unclear about whether or not you can scrape a site, check their Terms and Conditions.

You can view Google's Terms and Conditions here. Similar to many other companies, Google reserves the right to suspend, terminate or delete you account if they have reason to believe that you are connected to suspicious or malicious activity.

Also, do not collect and release anyone's personal data when scraping. In many countries this is illegal, and even if it is legal in your country, it's a pretty immoral thing to do. Always consider how your scraped data will be used as well. When you scrape a site from Google, some of the information you find might fall under the Terms and Conditions of that site as well.


Conclusion

Thank for reading. You now have a decent understanding of how to:

  • Make basic HTTP requests in Python
  • Scrape Google's search results
  • Write multithreaded code in Python
  • Integrate Requests using the ScrapeOps Proxy API Aggregator

Check out the links below to learn more about:


More Python Web Scraping Guides

In the mood to build something? Go do it! If you're in the mood to binge read, here at ScrapeOps, we've got a ton of guides for all sorts of fun and interesting scraping projects.

Check our The Python Web Scraping Playbook or take a look at some of the guides below!