How to Scrape Google Search With Python Requests and BeautifulSoup
When scraping the web, Google is a goldmine. While you are probably already familiar with Google and its convenience, you are probably unfamiliar with its utility when it comes to scraping.
If you can scrape a search engine, you can essentially build your own data mining operation. If you can scrape a search engine, you can identify other sites to scrape. In a data driven world, this is an incredibly lucrative skill to have.
In this guide, we'll go over the following topics:
- TLDR: How to Scrape Google Search with Python Requests
- How To Architect Our Google Scraper
- Understanding How To Scrape Google Search
- Building A Google Search Scraper
- Legal and Ethical Considerations
- Conclusion
- More Web Scraping Guides
TLDR - How to Scrape Google Search With Python Requests
When scraping search results, pay attention to the following things:
- We get uniform results nested inside of
<div>
elements. - Each result has its own
<h3>
tag - Each result comes with an
href
that links to a website
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs, urlencode
import csv
import concurrent
from concurrent.futures import ThreadPoolExecutor
import os
import logging
import time
from dataclasses import dataclass, field, fields, asdict
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
proxy_url = "https://proxy.scrapeops.io/v1/"
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str
base_url: str
link: str
page: int
result_number: int
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_open = False
def save_to_csv(self):
self.csv_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename)
with open(self.csv_filename, mode="a", encoding="UTF-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate Item Found: {input_data.name}. Item dropped")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def get_scrapeops_url(url):
payload = {'api_key': API_KEY, 'url': url, 'country': 'us'}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url
def search_page(query, page, location="United States", headers=headers, pipeline=None, num=100, retries=3):
url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}"
payload = {
"api_key": API_KEY,
"url": url,
}
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(get_scrapeops_url(url))
soup = BeautifulSoup(response.text, 'html.parser')
divs = soup.find_all("div")
index = 0
last_link = ""
for div in divs:
h3s = div.find_all("h3")
if len(h3s) > 0:
link = div.find("a", href=True)
parsed_url = urlparse(link["href"])
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
site_info = {'title': h3s[0].text, "base_url": base_url, 'link': link["href"], "page": page, "result_number": index}
search_data = SearchData(
name = site_info["title"],
base_url = site_info["base_url"],
link = site_info["link"],
page = site_info["page"],
result_number = site_info["result_number"]
)
if site_info["link"] != last_link:
index += 1
last_link = site_info["link"]
if pipeline:
pipeline.add_data(search_data)
success = True
except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
tries += 1
if not success:
print(f"Failed to scrape page {page}, no retries left")
raise Exception(f"Max retries exceeded: {retries}")
else:
print(f"Scraped page {page} with {retries-tries} retries left")
def full_search(query, pages=3, location="us", MAX_THREADS=5, MAX_RETRIES=3, num=10):
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
pipeline = DataPipeline(csv_filename=f"{query.replace(' ', '-')}.csv")
tasks = [executor.submit(search_page, query, page, location, None, pipeline, num, MAX_RETRIES) for page in range(pages)]
for future in tasks:
future.result()
pipeline.close_pipeline()
if __name__ == "__main__":
MAX_THREADS = 5
MAX_RETRIES = 5
queries = ["cool stuff"]
logger.info("Starting full search...")
for query in queries:
full_search(query, pages=3, num=10)
logger.info("Search complete.")
When running this code simply change the search_results
line:
full_search(query, pages=20)
If you'd like to search for 100 pages of boring stuff, simply change it to:
full_search(query, pages=100)
And add boring stuff to your QUERIES
array: ["cool stuff", "boring stuff"]
Other things you can tweak are:
location
MAX_THREADS
MAX_RETRIES
num
If you'd like to run this code, simply copy and paste it into a Python script yourscript.py
. Obviously you can name it whatever you want. Once you have your script and dependencies installed, run the following command:
python yourscript.py
This will output a CSV file containing your search results.
How To Architect Our Google Scraper
There are many different tools we can use to scrape Google Search. In this article, we'll focus on Requests and BeautifulSoup. Our scraper needs to be able to do the following:
- Perform a get request for a Google Search
- Interpret the results
- Save the number of each result, its url, and site description
For starters, we need to build a simple parser to pull the nested information from all of Google's nested divs. Once we have parser, we'll add pagination. Next, we'll add data processing and concurrency. Afterward, we'll add a proxy. Once we're ready for the production run, we'll clean everything up a bit more and improve our data processing as well.
Remember, our scraper needs:
- Parsing
- Pagination
- Data Processing
- Concurrency
- Proxy
Understanding How To Scrape Google Search
Before plunging head first into code, we're going to talk about how our scraper works on a high level. In this section, we're going over the required steps in greater detail. If you've got some experience in web scraping already, feel free to skip this section.
Step 1: How To Request Google Search Pages
Let's start by taking a look at Google Search results manually. The screenshot below is the result of searching the term "cool stuff".
As you can see in the address bar, we send the following GET request:
https://www.google.com/search?q=cool+stuff
The actual domain we're pinging is https://www.google.com/search. At the end, you should notice ?q=cool+stuff
.
In the address bar, a question mark, ?
denotes a query (in this case we're querying q
) and the variable for our query is denoted by an equal operator, =
.
So, ?q=cool+stuff
means that our search query is for cool stuff
. If we wanted to search for boring stuff, we could instead use ?q=boring+stuff
.
In the days of old, at the bottom of our page, we would see a list of page numbers. This made search results incredibly easy to scrape.
While Google doesn't exactly give us page numbers anymore, they do give us a start
query that we can use in order to paginate our results. We get our results in batches of 10. With variables figured in, our url will look like this:
https://www.google.com/search?q={query}&start={page * 10}
They also give us a num
query that we can use to control the number of results that we get. Taking num
into account, our url would look more like this:
https://www.google.com/search?q={query}&start={page * num}&num={num}"
We can set num
up to 100 results, but Google's response doesn't always give us these results when we request them. Multiple times throughout the writing of this article, I've used num=100
and been blocked or gotten smaller results. Other times I have gotten proper results.
Step 2: How To Extract Data From Google Search
As you saw in the screenshot earlier, all of our results come with an <h3>
tag. To find our results, we can simply use BeautifulSoup's .find_all()
method. Some websites like to nest a bunch of different things inside of an element and Google is no exception.
Here is the full HTML of our first result: <h3 class="LC20lb MBeuO DKV0Md">Cool Stuff</h3>
. As you can see, the class name is a bunch of jumbled garbage and there is no link within the tag! This is because Google (like many other sites) nests all of our important information within a <div>
.
If the class name of each result was more legible and not subject to change, I would recommend using this as a way to parse the result. Since the class name is likely to change over time, we're simply going to get all of the <div>
elements and find the <h3>
elements nested inside of them. We'll use soup.find_all()
and we'll use a last_link
variable.
For each result we get, we'll compair its link to the last link. If the current link is the same as the last link, we'll ignore this element and move on to the next one.
Step 3: How To Control Pagination
As we touched on briefly earlier, Google no longer gives a real page numbers. What they do give us is a result number. Our results tend to come in batches of 10, so it's quite easy to parse our data in a page like fashion.
Take a closer look at our url format again:
https://www.google.com/search?q={query}&start={page * 10}
We simply multiply our page by 10. Using this method, we'll fetch results 1 through 10, then 20 through 30, and so on and so forth. In testing, Google has occasionally given us up to 12 results but that's ok. Even if we get duplicates, we can remove them when we handle the data.
Step 4: Geolocated Data
Some websites will return different results depending on our location. Once again, Google is no exception. To add a location, we can simply add the geo_location
parameter to our request.
At the moment, our full request looks like this:
https://www.google.com/search?q={query}&start={page * 10}
Here is what it looks like with our location added:
https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}
All in all, it's a pretty simple change.
Setting Up Our Google Scraper Project
Now that you've got a basic understanding of the process, it's time to begin setting up our project. We'll start by creating a new project folder. We can call it google-search-requests
.
You can create a new folder through your file explorer or enter the following command:
mkdir google-search-requests
Next, we need to create a virtual environment. I'll be using Python3.10-venv
.
First, we'll create a new virtual environment:
Linux/Mac
python3 -m venv google-search
Windows
python -m venv google-search
One we've got our new environment created, let's activate it:
Linux/Mac
source google-search/bin/activate
Windows
.\google-search\Scripts\Activate.ps1
Once your venv is activated, it's time to install individual dependencies. This command will install both requests
and beautifulsoup4
.
pip install requests beautifulsoup4
Step 1: Create Simple Search Data Parser
We've already been through our base logic. Let's create an intitial script that we can build from.
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs, urlencode
#search a single page
def google_search(query, retries=3):
tries = 0
#runtime loop for the scrape
while tries <= retries:
try:
url = f"https://www.google.com/search?q={query}"
response = requests.get(url)
results = []
last_link = ""
soup = BeautifulSoup(response.text, 'html.parser')
index = 0
for result in soup.find_all('div'):
title = result.find('h3')
if title:
title = title.text
else:
continue
base_url = ""
link = result.find('a', href=True)
if link:
link = link['href']
parsed_url = urlparse(link)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
continue
#this is the full site info we wish to extract
site_info = {'title': title, "base_url": base_url, 'link': link, "result_number": index}
if last_link != site_info["link"]:
results.append(result)
#return our list of results
print(f"Finished scrape with {tries} retries")
return results
except:
print("Failed to scrape the page")
print("Retries left:", retries-tries)
tries += 1
#if this line executes, the scrape has failed
raise Exception(f"Max retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 5
QUERIES = ["cool stuff"]
for query in QUERIES:
results = google_search("cool stuff", retries=MAX_RETRIES)
for result in results:
print(result)
In this example, we:
- Create a
google_search()
function that takes our query as a parameter - When we get the result,
BeautifulSoup(response.text, 'html.parser')
creates aBeautifulSoup
instance to parse through the HTML soup.find_all("div")
finds all the<div>
objectsresult.find("h3")
is used to find the header element of each resultlink = result.find('a', href=True)
extracts the link from the resulturlparse(link)
parses our linkbase_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
reconstructs thebase_url
so we can save it- We then create a
dict
,site_info
from the data we've extracted - If the
link
fromsite_info
is different thanlast_link
, we add our result to theresults
list - After parsing through the response and creating our list, we return the
results
list
Step 2: Add Pagination
We've already been through our base logic. Let's create an intitial script that we can build from.
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs
def google_search(query, pages=3, location="United States", retries=3):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
results = []
last_link = ""
for page in range(0, pages):
tries = 0
while tries <= retries:
try:
url = f"https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}"
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, 'html.parser')
index = 0
for result in soup.find_all('div'):
title = result.find('h3')
if title:
title = title.text
else:
continue
base_url = ""
#pull the raw link from the result
link = result.find('a', href=True)
if link:
link = link['href']
parsed_url = urlparse(link)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
continue
#this is the full site info we wish to extract
site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index}
#if the link is different from the last link
if last_link != site_info["link"]:
results.append(site_info)
index += 1
last_link = link
print(f"Scraped page {page} with {retries} retries left")
return results
except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
raise Exception(f"Max retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 5
QUERIES = ["cool stuff"]
for query in QUERIES:
results = google_search("cool stuff", retries=MAX_RETRIES)
for result in results:
print(result)
There's alot going on in the code above. While it's not much different from our initial prototype, there is one really important part to pay attention to here, &start={page * 10}
. This is the basis for how we try to batch our results. We also add in functionality for our geo_location
, but by the time we add our proxy, this functionality is actually going to be moved elsewhere in our code.
Step 3: Storing the Scraped Data
As you probably noticed in our earlier examples, we store each object as a dict
with key-value pairs. Here is the first result so you can see how the data is laid out:
{'title': 'Cool Stuff', 'base_url': '://', 'link': '/search?sca_esv=3d5aec0ebbda9031&q=cool+stuff&uds=AMwkrPusHYa-Y5lqXPwpg8jJI99FKYz2zi9dec3bfM0lH-hil3eHKWSsmwBdtnNX2uzO7rvzH_UOAG-8W6q5RMgyj5EtPQRweAkj97b7yv-dxhFjVNmTpUmjIG8LX5BTVMn1i8RvhFDaroRDPKXSl9mGzRdmu5ujMGh35B6t9hZQe5OWf6qF9qyxdHJPailq0Was2Ti5R1Efg6G0TWkZl8Q0a4QgLEUcLEh8uM-Gr_AIA73YM8e13Y_Y5x_btmkZoDODrensXIErfUplY9wGJ9in8N6PV9WQjCg77wu2IOm5pmE8706LnWQ&udm=2&prmd=isvnmbtz&sa=X&ved=2ahUKEwi459DNvrOFAxXzh1YBHfFMDlsQtKgLegQIEhAB', 'page': 0, 'result_number': 0}
Each object has a title
, base_url
, link
, page
, and result_number
. Because we have uniform data stored in key-value pairs, we already have the makings of a DataFrame
and therefore a CSV
.
To our imports, add the following line:
import csv
Then, update the script to look like this:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs
import csv
from os import path
def write_page_to_csv(filename, object_array):
path_to_csv = filename
file_exists = path.exists(filename)
with open(path_to_csv, mode="a", newline="", encoding="UTF-8") as file:
#name the headers after our object keys
writer = csv.DictWriter(file, fieldnames=object_array[0].keys())
if not file_exists:
writer.writeheader()
writer.writerows(object_array)
def google_search(query, pages=3, location="United States", retries=3):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
results = []
last_link = ""
for page in range(0, pages):
tries = 0
success = False
while tries <= retries and not success:
try:
url = f"https://www.google.com/search?q={query}&start={page * 10}"
response = requests.get(url, headers=headers)
print(f"Response Code: {response.status_code}")
soup = BeautifulSoup(response.text, 'html.parser')
index = 0
for result in soup.find_all('div'):
title = result.find('h3')
if title:
title = title.text
else:
continue
base_url = ""
#pull the raw link from the result
link = result.find('a', href=True)
if link:
link = link['href']
parsed_url = urlparse(link)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
continue
#this is the full site info we wish to extract
site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index}
#if the link is different from the last link
if last_link != site_info["link"]:
results.append(site_info)
index += 1
last_link = link
print(f"Scraped page {page} with {retries} retries left")
write_page_to_csv(f"{query}.csv", results)
success = True
except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
tries += 1
if not success:
raise Exception(f"Max retries exceeded: {retries}")
if __name__ == "__main__":
MAX_RETRIES = 5
QUERIES = ["cool stuff"]
for query in QUERIES:
google_search("cool stuff", retries=MAX_RETRIES)
Pay close attention to the write_page_to_csv()
function:
def write_page_to_csv(filename, object_array):
path_to_csv = filename
file_exists = path.exists(filename)
with open(path_to_csv, mode="a", newline="", encoding="UTF-8") as file:
#name the headers after our object keys
writer = csv.DictWriter(file, fieldnames=object_array[0].keys())
if not file_exists:
writer.writeheader()
writer.writerows(object_array)
- The function above takes in an
object_array
(in this case our page results) and writes it to ourfilename
. - If the file doesn't already exist, we create it. If it doesn't exist, we simply open and
append
it. - This allows us to put multiple page results into the same file without corrupting it.
- When scraping at scale, we need to be able to scrape multiple pages of results and put them into the same file... This is the whole purpose of scraping to begin with... collecting data!
Here is a screenshot of the resulting file:
This is almost identical to our previous iteration, with some small differences at the end:
- We do not print our results
- We instead write each list of page results to a csv file
It's important to append the csv file as soon as we have our results. If our scraper crashes halfway through the job, we still get some data. It's also very important to open this file in append
mode so we don't overwrite any important data that we've scraped previously.
Step 4: Adding Concurrency
Now that we've got a working model from start to finish, let's focus on performance! We're going to split our google_search()
function into two separate functions, search_page()
and full_search()
. search_page()
will search a single page and full_search()
will create multiple threads that call search_page()
concurrently.
Add the following import
statement:
from concurrent.futures import ThreadPoolExecutor
Now we'll refactor our google_search()
function into our search_page()
function.
def search_page(query, page, location="United States", retries=3, num=100):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
results = []
last_link = ""
tries = 0
success = False
while tries <= retries and not success:
try:
url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}"
response = requests.get(url, headers=headers)
if response.status_code != 200:
print("Failed server response", response.status_code)
raise Exception("Failed server response!")
print(f"Response Code: {response.status_code}")
soup = BeautifulSoup(response.text, 'html.parser')
index = 0
for result in soup.find_all('div'):
title = result.find('h3')
if title:
title = title.text
else:
continue
base_url = ""
#pull the raw link from the result
link = result.find('a', href=True)
if link:
link = link['href']
parsed_url = urlparse(link)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
continue
#this is the full site info we wish to extract
site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index}
#if the link is different from the last link
if last_link != site_info["link"]:
results.append(site_info)
index += 1
last_link = link
write_page_to_csv(f"{query}.csv", results)
success = True
except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
tries += 1
if not success:
print(f"Failed to scrape page {page}, no retries left")
raise Exception(f"Max retries exceeded: {retries}")
else:
print(f"Scraped page {page} with {retries} retries left")
This function:
- Removes the
pages
argument and replaces it withpage
- Instead of running a
for
loop and iterating through pages, we simply execute our parsing logic on thepage
we're searching
Next, we'll create a full_search()
function:
def full_search(query, pages=3, location="United States", MAX_THREADS=5, MAX_RETRIES=4, num=100):
page_numbers = list(range(pages))
full_results = []
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
executor.map(search_page, [query]*pages, page_numbers, [location]*pages, [MAX_RETRIES], [num])
While it may look a bit intimidating, this function is actually rather simple.
- It takes one real argument,
query
. Everything else is akwarg
used to tweak our settings. - The scariest looking portion of this code is
executor.map()
. As bizarre as it looks, it's actually pretty simple. - It takes
search_page
as the first argument and the rest of the args are just parameters that we wish to pass intosearch_page()
.
This function is super important, it allows us to use a single thread for each page. When doing this, we can scrape multiple pages at the same time.
At this point, our full scraper should look like this:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs
import csv
from os import path
from concurrent.futures import ThreadPoolExecutor
def write_page_to_csv(filename, object_array):
path_to_csv = filename
file_exists = path.exists(filename)
with open(path_to_csv, mode="a", newline="", encoding="UTF-8") as file:
#name the headers after our object keys
writer = csv.DictWriter(file, fieldnames=object_array[0].keys())
if not file_exists:
writer.writeheader()
writer.writerows(object_array)
def search_page(query, page, location="United States", retries=3, num=100):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
results = []
last_link = ""
tries = 0
success = False
while tries <= retries and not success:
try:
url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}"
response = requests.get(url, headers=headers)
if response.status_code != 200:
print("Failed server response", response.status_code)
raise Exception("Failed server response!")
print(f"Response Code: {response.status_code}")
soup = BeautifulSoup(response.text, 'html.parser')
index = 0
for result in soup.find_all('div'):
title = result.find('h3')
if title:
title = title.text
else:
continue
base_url = ""
#pull the raw link from the result
link = result.find('a', href=True)
if link:
link = link['href']
parsed_url = urlparse(link)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
continue
#this is the full site info we wish to extract
site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index}
#if the link is different from the last link
if last_link != site_info["link"]:
results.append(site_info)
index += 1
last_link = link
write_page_to_csv(f"{query}.csv", results)
success = True
except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
tries += 1
if not success:
print(f"Failed to scrape page {page}, no retries left")
raise Exception(f"Max retries exceeded: {retries}")
else:
print(f"Scraped page {page} with {retries} retries left")
def full_search(query, pages=3, location="United States", MAX_THREADS=5, MAX_RETRIES=4, num=100):
page_numbers = list(range(pages))
full_results = []
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
executor.map(search_page, [query]*pages, page_numbers, [location]*pages, [MAX_RETRIES], [num])
if __name__ == "__main__":
MAX_RETRIES = 5
QUERIES = ["cool stuff"]
for query in QUERIES:
full_search(query, pages=1)
If you run this code, you'll probably get output similar to the image below. The reason for this is actually really simple. Our scraper was already faster than a normal human, and we just sped it up even more! Google recognizes the fact that our scraper doesn't appear human at all, so they block us.
Step 5: Bypassing Anti-Bots
As you probably recall from the last section, now that our requests are coming in really fast, we get blocked. The ScrapeOps Proxy is perfect for addressing this issue.
With the ScrapeOps Proxy, we get rotating IP addresses and we also have a middleman server. Because we're communicating to a server in the middle, this also slows down the rate at which we make our request. In short, our requests are spaced apart, and each one comes from a different IP address. This makes it nearly impossible to identify and block our scraper.
In the code below, we create a simple function, get_scrapeops_url()
. This is a really simply function that just performs some basic string formatting for us, but this is vital to our scraper. We now have the ability to convert any url into a proxied url with very minimal impact on our overall code. With this function, we can now run our Python script, without getting blocked!
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs, urlencode
import csv
from os import path
from concurrent.futures import ThreadPoolExecutor
#our default user agent
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
proxy_url = "https://proxy.scrapeops.io/v1/"
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
def get_scrapeops_url(url, location='us'):
payload = {'api_key': API_KEY, 'url': url, 'country': location}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url
def write_page_to_csv(filename, object_array):
path_to_csv = filename
file_exists = path.exists(filename)
with open(path_to_csv, mode="a", newline="", encoding="UTF-8") as file:
#name the headers after our object keys
writer = csv.DictWriter(file, fieldnames=object_array[0].keys())
if not file_exists:
writer.writeheader()
writer.writerows(object_array)
def search_page(query, page, location="United States", retries=3, num=100):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
results = []
last_link = ""
tries = 0
success = False
while tries <= retries and not success:
try:
url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}"
response = requests.get(get_scrapeops_url(url), headers=headers)
if response.status_code != 200:
print("Failed server response", response.status_code)
raise Exception("Failed server response!")
print(f"Response Code: {response.status_code}")
soup = BeautifulSoup(response.text, 'html.parser')
index = 0
for result in soup.find_all('div'):
title = result.find('h3')
if title:
title = title.text
else:
continue
base_url = ""
#pull the raw link from the result
link = result.find('a', href=True)
if link:
link = link['href']
parsed_url = urlparse(link)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
else:
continue
#this is the full site info we wish to extract
site_info = {'title': title, "base_url": base_url, 'link': link, "page": page, "result_number": index}
#if the link is different from the last link
if last_link != site_info["link"]:
results.append(site_info)
index += 1
last_link = link
write_page_to_csv(f"{query}.csv", results)
success = True
except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
tries += 1
if not success:
print(f"Failed to scrape page {page}, no retries left")
raise Exception(f"Max retries exceeded: {retries}")
else:
print(f"Scraped page {page} with {retries} retries left")
def full_search(query, pages=3, location="us", MAX_THREADS=5, MAX_RETRIES=4, num=100):
page_numbers = list(range(pages))
full_results = []
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
executor.map(search_page, [query]*pages, page_numbers, [location]*pages, [MAX_RETRIES]*pages, [num]*pages)
if __name__ == "__main__":
MAX_RETRIES = 5
RESULTS_PER_PAGE = 10
QUERIES = ["cool stuff"]
for query in QUERIES:
full_search(query, pages=3, num=RESULTS_PER_PAGE)
There are a couple of things you should pay attention to here:
- We instead make our requests to
proxy_url
:- We use
get_scrapeops_url()
to convert regular urls into proxied ones - We no longer pass our location into a request to Google, we use it as a query param for ScrapeOps...ScrapeOps will take care of our location for us!
- We use
In production, you should always use a good proxy. When we use proxies, the site server (in this case Google), can't pin down our location because all of our requests are coming from all over the place!
Step 6: Production Run
Time for the production run. You can view the full production level scraper below. This version of the script actually completely removes the return value from the search_page()
function and we use multithreading in the main
block at the bottom of the script instead a full_search()
function. We also added basic logging and file handling to prevent from overwriting results.
Take note of the following classes: SearchData
and DataPipeline
. SearchData
is a more simple class that basically just holds the data we're choosing to scrape. DataPipeline
is where the real heavy lifting gets done.
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs, urlencode
import csv
import concurrent
from concurrent.futures import ThreadPoolExecutor
import os
import logging
import time
from dataclasses import dataclass, field, fields, asdict
#our default user agent
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.3'}
proxy_url = "https://proxy.scrapeops.io/v1/"
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
## Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str
base_url: str
link: str
page: int
result_number: int
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == "":
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_open = False
def save_to_csv(self):
self.csv_open = True
data_to_save = []
data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not data_to_save:
return
keys = [field.name for field in fields(data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename)
with open(self.csv_filename, mode="a", encoding="UTF-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in data_to_save:
writer.writerow(asdict(item))
self.csv_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate Item Found: {input_data.name}. Item dropped")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def get_scrapeops_url(url):
payload = {'api_key': API_KEY, 'url': url, 'country': 'us'}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url
def search_page(query, page, location="United States", headers=headers, pipeline=None, num=100, retries=3):
url = f"https://www.google.com/search?q={query}&start={page * num}&num={num}"
payload = {
"api_key": API_KEY,
"url": url,
}
tries = 0
success = False
while tries <= retries and not success:
try:
response = requests.get(get_scrapeops_url(url))
soup = BeautifulSoup(response.text, 'html.parser')
divs = soup.find_all("div")
index = 0
last_link = ""
for div in divs:
h3s = div.find_all("h3")
if len(h3s) > 0:
link = div.find("a", href=True)
parsed_url = urlparse(link["href"])
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
site_info = {'title': h3s[0].text, "base_url": base_url, 'link': link["href"], "page": page, "result_number": index}
search_data = SearchData(
name = site_info["title"],
base_url = site_info["base_url"],
link = site_info["link"],
page = site_info["page"],
result_number = site_info["result_number"]
)
if site_info["link"] != last_link:
index += 1
last_link = site_info["link"]
if pipeline:
pipeline.add_data(search_data)
success = True
except:
print(f"Failed to scrape page {page}")
print(f"Retries left: {retries-tries}")
tries += 1
if not success:
print(f"Failed to scrape page {page}, no retries left")
raise Exception(f"Max retries exceeded: {retries}")
else:
print(f"Scraped page {page} with {retries-tries} retries left")
def full_search(query, pages=3, location="us", MAX_THREADS=5, MAX_RETRIES=3, num=10):
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
pipeline = DataPipeline(csv_filename=f"{query.replace(' ', '-')}.csv")
tasks = [executor.submit(search_page, query, page, location, None, pipeline, num, MAX_RETRIES) for page in range(pages)]
for future in tasks:
future.result()
pipeline.close_pipeline()
if __name__ == "__main__":
MAX_THREADS = 5
MAX_RETRIES = 5
queries = ["cool stuff"]
logger.info("Starting full search...")
for query in queries:
full_search(query, pages=3, num=10)
logger.info("Search complete.")
Remember:
SearchData
is a class that simply holds our dataDataPipeline
does all the heavy lifting of removing duplicates and writing the data to our csv file
Legal and Ethical Considerations
Whenever you scrape the web, you need to follow the terms and conditions of the site you're scraping. Always consult the robots.txt
file to see what they allow. Generally, if you are scraping as a guest (not logged in), the information is considered to be public and scraping is usually alright.
You can look at Google's robots.txt
here. In addition, if you're unclear about whether or not you can scrape a site, check their Terms and Conditions.
You can view Google's Terms and Conditions here. Similar to many other companies, Google reserves the right to suspend, terminate or delete you account if they have reason to believe that you are connected to suspicious or malicious activity.
Also, do not collect and release anyone's personal data when scraping. In many countries this is illegal, and even if it is legal in your country, it's a pretty immoral thing to do. Always consider how your scraped data will be used as well. When you scrape a site from Google, some of the information you find might fall under the Terms and Conditions of that site as well.
Conclusion
Thank for reading. You now have a decent understanding of how to:
- Make basic HTTP requests in Python
- Scrape Google's search results
- Write multithreaded code in Python
- Integrate Requests using the ScrapeOps Proxy API Aggregator
Check out the links below to learn more about:
More Python Web Scraping Guides
In the mood to build something? Go do it! If you're in the mood to binge read, here at ScrapeOps, we've got a ton of guides for all sorts of fun and interesting scraping projects.
Check our The Python Web Scraping Playbook or take a look at some of the guides below!