Skip to main content

Scrape Google Search Results With Selenium

How to Scrape Google Search Results With Selenium

Google is the most used search engine in the entire world. If you're reading this article, you probably found it on Google. You probably also use Google multiple times per day without even thinking about it. Any time anybody wants to look anything up, they almost always use Google.

What if you could harness Google Search results for web scraping? You can!!!

Not only can you integrate Google Search into your own scraper, but doing so lays the bedrock for creating your own crawler! In today's data-driven world, this is a priceless skill to have in your scraping toolbox.

In this extensive guide, we'll take you through how to scrape Google Search Results using Selenium.


TLDR: How to Scrape Google Search with Selenium

When we scrape Google Search, we get results from all over the web. Here is a production ready scraper already built to use the ScrapeOps Proxy API Aggregator.

This gives us access to results from all over the web and also gives us the beginning of a much larger crawler.

from selenium import webdriver
from selenium.webdriver.common.by import By
from time import sleep
import csv
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlencode
import os
import logging
from dataclasses import dataclass, field, fields, asdict

#create a custom options instance
options = webdriver.ChromeOptions()
#add headless mode to our options
options.add_argument("--headless")

API_KEY = "YOUR-SUPER-SECRET-API-KEY"

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str
link: str
result_number: int
page_number: int

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
self.data_to_save = []
self.data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not self.data_to_save:
return
keys = [field.name for field in fields(self.data_to_save[0])]

file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="UTF-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in self.data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def get_scrapeops_url(url):
payload = {'api_key': API_KEY, 'url': url, 'country': 'us'}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url

#this function performs a search and parses the results
def search_page(query, page, location):
#start Chrome with our custom options
driver = webdriver.Chrome(options=options)
#go to the page
driver.get(get_scrapeops_url(f"https://www.google.com/search?q={query}&start={page * 10}"))
#find each div containing site info...THEY'RE SUPER NESTED!!!
divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div")
#list to hold our results
results = []
#index, this will be used to number the results
index = 0
#last link
last_link = ""
#iterate through our divs
for div in divs:
#find the title element
title = div.find_elements(By.CSS_SELECTOR, "h3")
link = div.find_elements(By.CSS_SELECTOR, "a")
if len(title) > 0 and len(link) > 0:
#result number on the page
result_number = index
#site info object
site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page}
if site_info["link"] != last_link:
#add the object to our list
results.append(site_info)
#increment the index
index += 1
#update the last link
last_link = site_info["link"]
#the scrape has finished, close the browser
driver.quit()
#return the result list
return results
#function to search multiple pages, calls search_page() on each
def full_search(query, pages=3, location="United States"):
#list for our full results
full_results = []
#list of page numbers
page_numbers = list(range(0, pages))
#open with a max of 5 threads
with ThreadPoolExecutor(max_workers=5) as executor:
#call search page, pass all the following aruments into it
future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages)
#for each thread result
for page_result in future_results:
#add it to the full_results
full_results.extend(page_result)
#return the finalized list
return full_results

if __name__ == "__main__":

logger.info("Starting scrape")
data_pipeline = DataPipeline(csv_filename="production-search.csv")

search_results = full_search("cool stuff")

for result in search_results:
search_data = SearchData(name=result["title"], link=result["link"], result_number=result["result_number"] , page_number=result["page"])
data_pipeline.add_data(search_data)

data_pipeline.close_pipeline()
logger.info("Scrape Complete")
  • To run this scraper, simply paste the code into a file and enter python your-script.py
  • In order to change the query, simply replace the words "cool stuff" with whatever you'd like to query
  • If you'd like to change the result count, simply change the pages kwarg
  • To run a search of 100 pages, you would do full_search("boring stuff", pages=100)

How To Architect Our Google Scraper

When scraping search results from Google, we need to be able to do the following:

  1. Create a Google Search.
  2. Interpret the results.
  3. Fetch more results.
  4. Repeat steps 2 and 3 until we have our desired data.

Our best implementation of a Google Scraper will be able to parse a page. It also needs to manage pagination. It should be able to perform tasks with concurrency. It should also be set up to work with a proxy.

Why does our scraper need these qualities?

  • To extract data from a page, we need to parse the HTML.
  • To request different pages (batches of data), we need to control our pagination.
  • When parsing our data concurrently, our scraper will complete tasks quicker because multiple things are happening at the same time.
  • When we use a proxy, we greatly decrease our chances of getting blocked, and we can also choose our location much more successfully because the proxy will give us an IP address matching the location we choose.

Let's get started building our scraper.

First, we're going to simply parse and extract data from a Google Search result. Once we can handle a single page, we'll add support for pagination.

Then, we'll learn how to store this data in a CSV file. At this point, we'll technically have a working scraper from start to finish, so we'll focus on making improvements by adding concurrency for speed and efficiency.

Finally, we'll add proxy support to make our scraper stable and reliable.


Step 1: How To Request Google Search Pages

Take a look at the image below:

Google SERP results

If you look at the address bar, you should see:

https://www.google.com/search?q=cool+stuff

Let's break this down:

  • Our base domain is https://www.google.com
  • The endpoint we want from the domain is /search
  • ?q=cool+stuff represents the query we're making:
    • ? denotes the query
    • q is the value that we're querying
    • cool+stuff is equivalent to the string, "cool stuff"...+ denotes a space in the words

As you might have noticed in the screenshot earlier, each of our search results comes with an <h3> tag, so this is a good place to look. If you choose to inspect the page further, you'll come to notice that each of these headers is deeply nested inside a number of <div> tags.

To find our results, we need to find all the div elements containing these h3 elements. If we properly identify and parse each div, we can extract all of the relevant information from it.


Step 3: How To Control Pagination

As mentioned previously, ? denotes a query. We can actually add other query parameters using &. Google typically gives us results in batches of 10. With this in mind, we can actually request multiple "pages" by passing in a start query.

After the the start parameter is added, our formatted url looks like this:


'https://www.google.com/search?q={query}&start={page * 10}'

We pass our page number multiplied by 10 because of the way our results get delivered. If we want to start at 0, our start would be {0 * 10}. The next batch of results would be {1 * 10}. Then {2 * 10} and so on and so forth.


Step 4: Geolocated Data

Speaking of query params, we can also add one for location. If we add the geo_location parameter to our query, we can actually get results based on that individual location.

Now, our formatted url would look like this:


'https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}'

While this is an extremely small change, this gives us the power to drastically change our results.


Setting Up Our Google Scraper Project

Now that we understand the basic strategy that our scraper needs to execute, let's get started on building it!

We'll start by creating a new project folder. You can do this from within your file explorer or run the command below.

mkdir google-search

From within the project folder, we want to create a new virtual environment.

Linux/Mac

python3 -m venv google-search

Windows

python -m venv google-search

One we've got our new environment created, let's activate it:

Linux/Mac

source google-search/bin/activate

Windows

.\google-search\Scripts\Activate.ps1

Once our environment has been activated, it's time to install dependencies. We can install Selenium through pip.

pip install selenium

You will also need to ensure that you have Chrome and webdriver installed. You can check your version of Chrome with the following command:

google-chrome --version

It should output a result similar to this:

Google Chrome 123.0.6312.105 

Once you know what version of Chrome you're using, you can head on over to https://chromedriver.chromium.org/ and get the version matching it.

If you are using an older version of Chrome, you may have to update your driver more often. Chromedriver 115 and above tend to have some automated webdriver updates which makes dependency management a bit easier.


Building A Google Search Scraper

As we know, our scraper needs to be able to make custom requests in this format:


'https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}'

Now let's begin building a Selenium scraper that can handle this. Our scraper needs to operate in the following steps:

  1. Launch a headless browser
  2. Get a page of results
  3. Interpret the results
  4. Repeat steps 2 and 3 until we have our desired data
  5. Save the data
  6. Close the browser and exit the program

Step 1: Create Simple Search Data Parser

Let's start with a simple scraper that looks performs a search and parses the results. The code below is designed to do exactly that.

from selenium import webdriver
from selenium.webdriver.common.by import By
#create a custom options instance
options = webdriver.ChromeOptions()
#add headless mode to our options
options.add_argument("--headless")
#this function performs a search and parses the results
def search_page(query):
#start Chrome with our custom options
driver = webdriver.Chrome(options=options)
#go to the page
driver.get(f"https://www.google.com/search?q={query}")
#find each div containing site info...THEY'RE SUPER NESTED!!!
divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div")
#list to hold our results
results = []
#index, this will be used to number the results
index = 0
#iterate through our divs
for div in divs:
#find the title element
title = div.find_elements(By.CSS_SELECTOR, "h3")
#find the link element
link = div.find_elements(By.CSS_SELECTOR, "a")
#result number on the page
result_number = index
#if we have a result
if len(title) > 0:
#site info object
site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number}
#add the object to our list
results.append(site_info)
#increment the index
index += 1
#the scrape has finished, close the browser
driver.quit()
#return the result list
return results

####this is our main program down here####
search_results = search_page("cool stuff")
#print our results
for result in search_results:
print(result)

In the code above, we:

  • Create a custom instance of ChromeOptions and add the "--headless" argument to it
  • Create a search_page() function that takes a query as a parameter
  • webdriver.Chrome(options=options) opens our browser in headless mode
  • We then use driver.get() to go to our site
  • We then find all of our target div elements using their CSS Selector...They are SUPER NESTED!
  • We create a list to hold our results
  • We create an index variable so that we can give each result a number
  • To avoid an "element not found" exception, we use find_elements() to get the title and link for each object
  • If the list returned by find_elements() is not empty, we save the following:
    • title.text
    • link.get_attribute("href")
    • result_number
  • After extracting the proper information, we append the object to our results list
  • Once we've gotten through all the results, we close the browser and return the results list

Step 2: Add Pagination

Now that we know how to scrape a single page, let's get started on adding pagination. As mentioned you read about the intial strategy of the scraper, the final formatted url should look like this:


'https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}'

Let's create a second function that takes our pagination into account. We'll also make some minor changes to the search_page() function.

from selenium import webdriver
from selenium.webdriver.common.by import By
#create a custom options instance
options = webdriver.ChromeOptions()
#add headless mode to our options
options.add_argument("--headless")
#this function performs a search and parses the results
def search_page(query, page, location):
#start Chrome with our custom options
driver = webdriver.Chrome(options=options)
#go to the page
driver.get(f"https://www.google.com/search?q={query}&start={page * 10}&location={location}")
#find each div containing site info...THEY'RE SUPER NESTED!!!
divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div")
#list to hold our results
results = []
#index, this will be used to number the results
index = 0
#last link
last_link = ""
#iterate through our divs
for div in divs:
#find the title element
title = div.find_elements(By.CSS_SELECTOR, "h3")
#find the link element
link = div.find_elements(By.CSS_SELECTOR, "a")
#result number on the page
result_number = index
#if we have a result
if len(title) > 0:
#site info object
site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page}
if site_info["link"] != last_link:
#add the object to our list
results.append(site_info)
#increment the index
index += 1
#update the last link
last_link = site_info["link"]
#the scrape has finished, close the browser
driver.quit()
#return the result list
return results
#function to search multiple pages, calls search_page() on each
def full_search(query, pages=3, location="United States"):
#list for our full results
full_results = []
#iterate through our pages
for page in range(0, pages):
#get the results of the page
page_results = search_page(query, page, location)
#add them to the full_results list
full_results.extend(page_results)
#return the finalized list
return full_results
####this is our main program down here####
search_results = full_search("cool stuff")
#print our results
for result in search_results:
print(result)

This code is only slightly different from our first example:

  • search_page() now takes three arguments: query, page, and location
  • page and location have been added into the formatted url
  • We also created another variable, last_link and use it to prevent doubles from getting into our results
  • We created a new full_search() function
  • full_search() simply runs search_page() on a list of pages and returns a full list of results

Step 3: Storing the Scraped Data

In the previous iterations of this scraper, we focused on returning uniform dict objects from each of our functions. The reason for using these dictionaries is simple, when you hold object data in a dict of key-value pairs, it's really easy to transform it into something else.

Not all libraries are build to handle all data formats, but almost all of them support JSON or dictionaries (both of these formats are key-value pairs).

Now, we'll remove the following code from the bottom of the script:

#print our results
for result in search_results:
print(result)

Add the following line to your imports:

import csv

Now, we'll add the following to the bottom of the file:

#path to the csv file
path_to_csv = "search-results.csv"
#open the file in write mode
with open(path_to_csv, "w") as file:
#format the file based on the keys of the first result
writer = csv.DictWriter(file, search_results[0].keys())
#write the headers
writer.writeheader()
#write each object as a row in the file
writer.writerows(search_results)

In this snippet, we:

  • Create a path_to_csv variable
  • Open the file using path_to_csv and "w" as arguments to open the file in write mode
  • csv.DictWriter(file, search_results[0].keys()) tells the writer object to format our file based on the keys of the first dict object in our list
  • writer.writeheader() writes the actual headers to the document
  • writer.writerows(search_results) writes our actual search results to the csv file

Step 4: Adding Concurrency

If you've run any of the previous examples, you should have noticed that it takes about 15 seconds to scrape the default 3 pages. In its current structure, our script goes through and scrapes each page sequentially. We can speed this up by scraping them concurrently.

In this section, we're going to refactor our full_search() function so that things are done concurrently.

Here is our modified full_search() function:

#function to search multiple pages, calls search_page() on each
def full_search(query, pages=3, location="United States"):
#list for our full results
full_results = []
#list of page numbers
page_numbers = list(range(0, pages))
#open with a max of 5 threads
with ThreadPoolExecutor(max_workers=5) as executor:
#call search page, pass all the following aruments into it
future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages)
#for each thread result
for page_result in future_results:
#add it to the full_results
full_results.extend(page_result)
#return the finalized list
return full_results

The full search function now does the following:

  • Create a list for our full results
  • Create a list of page numbers
  • Open a ThreadPoolExecutor instance with a max of 5 workers
  • executor.map(search_page, [query] * pages, page_numbers, [location] * pages) calls search_page() and passes in lists of arguments to it
  • We then take each page_result and use extend() to add it to the full_results list
  • Once finished, we return the list

Here is the newly updated file:

from selenium import webdriver
from selenium.webdriver.common.by import By
import csv
from concurrent.futures import ThreadPoolExecutor
#create a custom options instance
options = webdriver.ChromeOptions()
#add headless mode to our options
options.add_argument("--headless")
#this function performs a search and parses the results
def search_page(query, page, location):
#start Chrome with our custom options
driver = webdriver.Chrome(options=options)
#go to the page
driver.get(f"https://www.google.com/search?q={query}&start={page * 10}&location={location}")
#find each div containing site info...THEY'RE SUPER NESTED!!!
divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div")
#list to hold our results
results = []
#index, this will be used to number the results
index = 0
#last link
last_link = ""
#iterate through our divs
for div in divs:
#find the title element
title = div.find_elements(By.CSS_SELECTOR, "h3")
#find the link element
link = div.find_elements(By.CSS_SELECTOR, "a")
#result number on the page
result_number = index
#if we have a result
if len(title) > 0:
#site info object
site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page}
if site_info["link"] != last_link:
#add the object to our list
results.append(site_info)
#increment the index
index += 1
#update the last link
last_link = site_info["link"]
#the scrape has finished, close the browser
driver.quit()
#return the result list
return results
#function to search multiple pages, calls search_page() on each
def full_search(query, pages=3, location="United States"):
#list for our full results
full_results = []
#list of page numbers
page_numbers = list(range(0, pages))
#open with a max of 5 threads
with ThreadPoolExecutor(max_workers=5) as executor:
#call search page, pass all the following aruments into it
future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages)
#for each thread result
for page_result in future_results:
#add it to the full_results
full_results.extend(page_result)
#return the finalized list
return full_results
####this is our main program down here####
#results from the search
search_results = full_search("cool stuff")
#path to the csv file
path_to_csv = "concurrency.csv"
#open the file in write mode
with open(path_to_csv, "w") as file:
#format the file based on the keys of the first result
writer = csv.DictWriter(file, search_results[0].keys())
#write the headers
writer.writeheader()
#write each object as a row in the file
writer.writerows(search_results)

Step 5: Bypassing Anti-Bots

When scraping in the wild, we often run into anti-bot software. Anti-bots are exactly what they sound like. Because our scraper is a programmatically controlled browser, anti-bots will often block scrapers even if they're not malicious. In order to get past anti-bots, it is always best practice to use a proxy.

There are many tools to integrate proxies with different browsers, but the easiest way to do so is with simple string formatting. Take a look at the function below.

def get_scrapeops_url(url):
payload = {'api_key': API_KEY, 'url': url, 'country': 'us'}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url

As simple as it may look, this function holds the key to unlocking the power of the ScrapeOps Proxy. We simply encode our proxy params directly into the url that we want. We can then simply driver.get() this new proxied url just like we would with a non-proxied url. When scraping at scale, we need to use proxies consistently.

The ScrapeOps Proxy rotates IP addresses and always uses the best proxy available for each request. This actually allows each of our requests to show up as a different user with potentially a different browser, OS and often a different location as well.

When using a proxy, no one can block you based on your location, because your location changes whenever you make a new request to the site.

Here is a proxied version of our script:

from selenium import webdriver
from selenium.webdriver.common.by import By
from time import sleep
import csv
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlencode
#create a custom options instance
options = webdriver.ChromeOptions()
#add headless mode to our options
options.add_argument("--headless")

API_KEY = "YOUR-SUPER-SECRET-API-KEY"
def get_scrapeops_url(url):
payload = {'api_key': API_KEY, 'url': url, 'country': 'us'}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url

#this function performs a search and parses the results
def search_page(query, page, location):
#start Chrome with our custom options
driver = webdriver.Chrome(options=options)
#go to the page
driver.get(get_scrapeops_url(f"https://www.google.com/search?q={query}&start={page * 10}"))
#find each div containing site info...THEY'RE SUPER NESTED!!!
divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div")
#list to hold our results
results = []
#index, this will be used to number the results
index = 0
#last link
last_link = ""
#iterate through our divs
for div in divs:
#find the title element
title = div.find_elements(By.CSS_SELECTOR, "h3")
link = div.find_elements(By.CSS_SELECTOR, "a")
if len(title) > 0 and len(link) > 0:
#result number on the page
result_number = index
#site info object
site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page}
if site_info["link"] != last_link:
#add the object to our list
results.append(site_info)
#increment the index
index += 1
#update the last link
last_link = site_info["link"]
#the scrape has finished, close the browser
driver.quit()
#return the result list
return results
#function to search multiple pages, calls search_page() on each
def full_search(query, pages=3, location="United States"):
#list for our full results
full_results = []
#list of page numbers
page_numbers = list(range(0, pages))
#open with a max of 5 threads
with ThreadPoolExecutor(max_workers=5) as executor:
#call search page, pass all the following aruments into it
future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages)
#for each thread result
for page_result in future_results:
#add it to the full_results
full_results.extend(page_result)
#return the finalized list
return full_results

if __name__ == "__main__":
search_results = full_search("cool stuff")
#path to the csv file
path_to_csv = "proxied.csv"
#open the file in write mode
with open(path_to_csv, "w") as file:
#format the file based on the keys of the first result
writer = csv.DictWriter(file, search_results[0].keys())
#write the headers
writer.writeheader()
#write each object as a row in the file
writer.writerows(search_results)

Key things you should notice about this example:

  • "YOUR-SUPER-SECRET-API-KEY" should be replaced by your API key
  • get_scrapeops_url() converts normal urls into proxied ones
  • We have an actual main code block at the end of the script, this is because we're closer to production

Step 6: Production Run

Now, it's time for our production run. We added data storage earlier in the article. Since this example is meant to be the actual production code, we expand on that by adding a SearchData class and a DataPipeline class. SearchData doesn't do much other than hold and format individual results. The DataPipeline is where the real heavy lifting gets done as far as our production storage.

Here is our production scraper:

from selenium import webdriver
from selenium.webdriver.common.by import By
from time import sleep
import csv
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlencode
import os
import logging
from dataclasses import dataclass, field, fields, asdict

#create a custom options instance
options = webdriver.ChromeOptions()
#add headless mode to our options
options.add_argument("--headless")

API_KEY = "YOUR-SUPER-SECRET-API-KEY"

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SearchData:
name: str
link: str
result_number: int
page_number: int

def __post_init__(self):
self.check_string_fields()

def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())

class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False

def save_to_csv(self):
self.csv_file_open = True
self.data_to_save = []
self.data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not self.data_to_save:
return
keys = [field.name for field in fields(self.data_to_save[0])]

file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="UTF-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in self.data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False

def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped")
return True
self.names_seen.append(input_data.name)
return False

def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()

def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()

def get_scrapeops_url(url):
payload = {'api_key': API_KEY, 'url': url, 'country': 'us'}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url

#this function performs a search and parses the results
def search_page(query, page, location):
#start Chrome with our custom options
driver = webdriver.Chrome(options=options)
#go to the page
driver.get(get_scrapeops_url(f"https://www.google.com/search?q={query}&start={page * 10}"))
#find each div containing site info...THEY'RE SUPER NESTED!!!
divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div")
#list to hold our results
results = []
#index, this will be used to number the results
index = 0
#last link
last_link = ""
#iterate through our divs
for div in divs:
#find the title element
title = div.find_elements(By.CSS_SELECTOR, "h3")
link = div.find_elements(By.CSS_SELECTOR, "a")
if len(title) > 0 and len(link) > 0:
#result number on the page
result_number = index
#site info object
site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page}
if site_info["link"] != last_link:
#add the object to our list
results.append(site_info)
#increment the index
index += 1
#update the last link
last_link = site_info["link"]
#the scrape has finished, close the browser
driver.quit()
#return the result list
return results
#function to search multiple pages, calls search_page() on each
def full_search(query, pages=3, location="United States"):
#list for our full results
full_results = []
#list of page numbers
page_numbers = list(range(0, pages))
#open with a max of 5 threads
with ThreadPoolExecutor(max_workers=5) as executor:
#call search page, pass all the following aruments into it
future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages)
#for each thread result
for page_result in future_results:
#add it to the full_results
full_results.extend(page_result)
#return the finalized list
return full_results

if __name__ == "__main__":

logger.info("Starting scrape")
data_pipeline = DataPipeline(csv_filename="production-search.csv")

search_results = full_search("cool stuff")

for result in search_results:
search_data = SearchData(name=result["title"], link=result["link"], result_number=result["result_number"] , page_number=result["page"])
data_pipeline.add_data(search_data)

data_pipeline.close_pipeline()
logger.info("Scrape Complete")
  • To change the output filename, simply change "production-search.csv" to your desired filename
  • To change your search query, change "cool stuff" to whatever query you'd like to perform
  • If you'd like to scrape a different amount of pages, you can use the pages kwarg in the full_search() function:
  • If you want 1000 pages of boring stuff, you could do full_search("boring stuff", pages=1000)

While scraping public data (if you don't have to login to view the data, it's public data.) is generally considered legally acceptable, when scraping in production, always respect the policies of the website you're trying to scrape.

Always remember that public data is fair game. Don't scrape people's personal information and certainly don't share it... Be respectful of other people and their privacy.

If you are not sure about the policies of the website you're scraping check their robots.txt. You can view Google's robots.txt here.

Another thing to consider is the terms & service (T&C) policies of the websites. Unauthorized scraping or violating terms of service may result in legal action or being blocked from accessing services.

According to the T&C policy, Google reserves the right to suspend or terminate your access to the services or delete your Google Account if they reasonably believe that your conduct causes harm or liability to a user, third party, or Google — for example, by hacking, phishing, harassing, spamming, misleading others, or scraping content that doesn’t belong to you.

It's crucial to consider not only the legality of scraping data but also how the scraped data will be used. Data scraped from Google or other websites may be subject to copyright laws or regulations governing personal data, depending on the jurisdiction and intended use.


Conclusion

You've made it to the end. You now have a decent understanding of how to build a production scraper. You've learned how to parse data, how to add concurrency and how to integrate a proxy with Selenium.

If you'd like to see documentation related to this article, take a look at the links below.


More Python Web Scraping Guides

Now that you've learned all this, take your new skills and build something! Here at ScrapeOps, we have all sorts of interesting stuff to read. If you're in the mood to learn more, take a look at one of the articles below.