How to Scrape Google Search Results With Selenium
Google is the most used search engine in the entire world. If you're reading this article, you probably found it on Google. You probably also use Google multiple times per day without even thinking about it. Any time anybody wants to look anything up, they almost always use Google.
What if you could harness Google Search results for web scraping? You can!!!
Not only can you integrate Google Search into your own scraper, but doing so lays the bedrock for creating your own crawler! In today's data-driven world, this is a priceless skill to have in your scraping toolbox.
In this extensive guide, we'll take you through how to scrape Google Search Results using Selenium.
- TLDR: How to Scrape Google Search with Selenium
- How To Architect Our Google Scraper
- Understanding How To Scrape Google Search
- Building A Google Search Scraper
- Legal and Ethical Considerations
- Conclusion
- More Python Web Scraping Guides
Need help scraping the web?
Then check out ScrapeOps, the complete toolkit for web scraping.
TLDR: How to Scrape Google Search with Selenium
When we scrape Google Search, we get results from all over the web. Here is a production ready scraper already built to use the ScrapeOps Proxy API Aggregator.
This gives us access to results from all over the web and also gives us the beginning of a much larger crawler.
from selenium import webdriver
from selenium.webdriver.common.by import By
from time import sleep
import csv
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlencode
import os
import logging
from dataclasses import dataclass, field, fields, asdict
#create a custom options instance
options = webdriver.ChromeOptions()
#add headless mode to our options
options.add_argument("--headless")
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str
link: str
result_number: int
page_number: int
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
self.data_to_save = []
self.data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not self.data_to_save:
return
keys = [field.name for field in fields(self.data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="UTF-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in self.data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def get_scrapeops_url(url):
payload = {'api_key': API_KEY, 'url': url, 'country': 'us'}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url
#this function performs a search and parses the results
def search_page(query, page, location):
#start Chrome with our custom options
driver = webdriver.Chrome(options=options)
#go to the page
driver.get(get_scrapeops_url(f"https://www.google.com/search?q={query}&start={page * 10}"))
#find each div containing site info...THEY'RE SUPER NESTED!!!
divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div")
#list to hold our results
results = []
#index, this will be used to number the results
index = 0
#last link
last_link = ""
#iterate through our divs
for div in divs:
#find the title element
title = div.find_elements(By.CSS_SELECTOR, "h3")
link = div.find_elements(By.CSS_SELECTOR, "a")
if len(title) > 0 and len(link) > 0:
#result number on the page
result_number = index
#site info object
site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page}
if site_info["link"] != last_link:
#add the object to our list
results.append(site_info)
#increment the index
index += 1
#update the last link
last_link = site_info["link"]
#the scrape has finished, close the browser
driver.quit()
#return the result list
return results
#function to search multiple pages, calls search_page() on each
def full_search(query, pages=3, location="United States"):
#list for our full results
full_results = []
#list of page numbers
page_numbers = list(range(0, pages))
#open with a max of 5 threads
with ThreadPoolExecutor(max_workers=5) as executor:
#call search page, pass all the following aruments into it
future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages)
#for each thread result
for page_result in future_results:
#add it to the full_results
full_results.extend(page_result)
#return the finalized list
return full_results
if __name__ == "__main__":
logger.info("Starting scrape")
data_pipeline = DataPipeline(csv_filename="production-search.csv")
search_results = full_search("cool stuff")
for result in search_results:
search_data = SearchData(name=result["title"], link=result["link"], result_number=result["result_number"] , page_number=result["page"])
data_pipeline.add_data(search_data)
data_pipeline.close_pipeline()
logger.info("Scrape Complete")
- To run this scraper, simply paste the code into a file and enter
python your-script.py
- In order to change the query, simply replace the words
"cool stuff"
with whatever you'd like to query - If you'd like to change the result count, simply change the
pages
kwarg - To run a search of 100 pages, you would do
full_search("boring stuff", pages=100)
How To Architect Our Google Scraper
When scraping search results from Google, we need to be able to do the following:
- Create a Google Search.
- Interpret the results.
- Fetch more results.
- Repeat steps 2 and 3 until we have our desired data.
Our best implementation of a Google Scraper will be able to parse a page. It also needs to manage pagination. It should be able to perform tasks with concurrency. It should also be set up to work with a proxy.
Why does our scraper need these qualities?
- To extract data from a page, we need to parse the HTML.
- To request different pages (batches of data), we need to control our pagination.
- When parsing our data concurrently, our scraper will complete tasks quicker because multiple things are happening at the same time.
- When we use a proxy, we greatly decrease our chances of getting blocked, and we can also choose our location much more successfully because the proxy will give us an IP address matching the location we choose.
Understanding How To Scrape Google Search
Let's get started building our scraper.
First, we're going to simply parse and extract data from a Google Search result. Once we can handle a single page, we'll add support for pagination.
Then, we'll learn how to store this data in a CSV file. At this point, we'll technically have a working scraper from start to finish, so we'll focus on making improvements by adding concurrency for speed and efficiency.
Finally, we'll add proxy support to make our scraper stable and reliable.
Step 1: How To Request Google Search Pages
Take a look at the image below:
If you look at the address bar, you should see:
https://www.google.com/search?q=cool+stuff
Let's break this down:
- Our base domain is
https://www.google.com
- The endpoint we want from the domain is
/search
?q=cool+stuff
represents the query we're making:?
denotes the queryq
is the value that we're queryingcool+stuff
is equivalent to the string, "cool stuff"...+
denotes a space in the words
Step 2: How To Extract Data From Google Search
As you might have noticed in the screenshot earlier, each of our search results comes with an <h3>
tag, so this is a good place to look. If you choose to inspect the page further, you'll come to notice that each of these headers is deeply nested inside a number of <div>
tags.
To find our results, we need to find all the div
elements containing these h3
elements. If we properly identify and parse each div
, we can extract all of the relevant information from it.
Step 3: How To Control Pagination
As mentioned previously, ?
denotes a query. We can actually add other query parameters using &
. Google typically gives us results in batches of 10. With this in mind, we can actually request multiple "pages" by passing in a start query.
After the the start
parameter is added, our formatted url looks like this:
'https://www.google.com/search?q={query}&start={page * 10}'
We pass our page
number multiplied by 10 because of the way our results get delivered. If we want to start at 0, our start would be {0 * 10}
. The next batch of results would be {1 * 10}
. Then {2 * 10}
and so on and so forth.
Step 4: Geolocated Data
Speaking of query params, we can also add one for location. If we add the geo_location
parameter to our query, we can actually get results based on that individual location.
Now, our formatted url would look like this:
'https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}'
While this is an extremely small change, this gives us the power to drastically change our results.
Setting Up Our Google Scraper Project
Now that we understand the basic strategy that our scraper needs to execute, let's get started on building it!
We'll start by creating a new project folder. You can do this from within your file explorer or run the command below.
mkdir google-search
From within the project folder, we want to create a new virtual environment.
Linux/Mac
python3 -m venv google-search
Windows
python -m venv google-search
One we've got our new environment created, let's activate it:
Linux/Mac
source google-search/bin/activate
Windows
.\google-search\Scripts\Activate.ps1
Once our environment has been activated, it's time to install dependencies. We can install Selenium through pip
.
pip install selenium
You will also need to ensure that you have Chrome and webdriver installed. You can check your version of Chrome with the following command:
google-chrome --version
It should output a result similar to this:
Google Chrome 123.0.6312.105
Once you know what version of Chrome you're using, you can head on over to https://chromedriver.chromium.org/ and get the version matching it.
If you are using an older version of Chrome, you may have to update your driver more often. Chromedriver 115 and above tend to have some automated webdriver updates which makes dependency management a bit easier.
Building A Google Search Scraper
As we know, our scraper needs to be able to make custom requests in this format:
'https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}'
Now let's begin building a Selenium scraper that can handle this. Our scraper needs to operate in the following steps:
- Launch a headless browser
- Get a page of results
- Interpret the results
- Repeat steps 2 and 3 until we have our desired data
- Save the data
- Close the browser and exit the program
Step 1: Create Simple Search Data Parser
Let's start with a simple scraper that looks performs a search and parses the results. The code below is designed to do exactly that.
from selenium import webdriver
from selenium.webdriver.common.by import By
#create a custom options instance
options = webdriver.ChromeOptions()
#add headless mode to our options
options.add_argument("--headless")
#this function performs a search and parses the results
def search_page(query):
#start Chrome with our custom options
driver = webdriver.Chrome(options=options)
#go to the page
driver.get(f"https://www.google.com/search?q={query}")
#find each div containing site info...THEY'RE SUPER NESTED!!!
divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div")
#list to hold our results
results = []
#index, this will be used to number the results
index = 0
#iterate through our divs
for div in divs:
#find the title element
title = div.find_elements(By.CSS_SELECTOR, "h3")
#find the link element
link = div.find_elements(By.CSS_SELECTOR, "a")
#result number on the page
result_number = index
#if we have a result
if len(title) > 0:
#site info object
site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number}
#add the object to our list
results.append(site_info)
#increment the index
index += 1
#the scrape has finished, close the browser
driver.quit()
#return the result list
return results
####this is our main program down here####
search_results = search_page("cool stuff")
#print our results
for result in search_results:
print(result)
In the code above, we:
- Create a custom instance of
ChromeOptions
and add the"--headless"
argument to it - Create a
search_page()
function that takes a query as a parameter webdriver.Chrome(options=options)
opens our browser in headless mode- We then use
driver.get()
to go to our site - We then find all of our target
div
elements using their CSS Selector...They are SUPER NESTED! - We create a list to hold our results
- We create an
index
variable so that we can give each result a number - To avoid an "element not found" exception, we use
find_elements()
to get thetitle
andlink
for each object - If the list returned by
find_elements()
is not empty, we save the following:title.text
link.get_attribute("href")
result_number
- After extracting the proper information, we append the object to our results list
- Once we've gotten through all the results, we close the browser and return the results list
Step 2: Add Pagination
Now that we know how to scrape a single page, let's get started on adding pagination. As mentioned you read about the intial strategy of the scraper, the final formatted url should look like this:
'https://www.google.com/search?q={query}&start={page * 10}&geo_location={location}'
Let's create a second function that takes our pagination into account. We'll also make some minor changes to the search_page()
function.
from selenium import webdriver
from selenium.webdriver.common.by import By
#create a custom options instance
options = webdriver.ChromeOptions()
#add headless mode to our options
options.add_argument("--headless")
#this function performs a search and parses the results
def search_page(query, page, location):
#start Chrome with our custom options
driver = webdriver.Chrome(options=options)
#go to the page
driver.get(f"https://www.google.com/search?q={query}&start={page * 10}&location={location}")
#find each div containing site info...THEY'RE SUPER NESTED!!!
divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div")
#list to hold our results
results = []
#index, this will be used to number the results
index = 0
#last link
last_link = ""
#iterate through our divs
for div in divs:
#find the title element
title = div.find_elements(By.CSS_SELECTOR, "h3")
#find the link element
link = div.find_elements(By.CSS_SELECTOR, "a")
#result number on the page
result_number = index
#if we have a result
if len(title) > 0:
#site info object
site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page}
if site_info["link"] != last_link:
#add the object to our list
results.append(site_info)
#increment the index
index += 1
#update the last link
last_link = site_info["link"]
#the scrape has finished, close the browser
driver.quit()
#return the result list
return results
#function to search multiple pages, calls search_page() on each
def full_search(query, pages=3, location="United States"):
#list for our full results
full_results = []
#iterate through our pages
for page in range(0, pages):
#get the results of the page
page_results = search_page(query, page, location)
#add them to the full_results list
full_results.extend(page_results)
#return the finalized list
return full_results
####this is our main program down here####
search_results = full_search("cool stuff")
#print our results
for result in search_results:
print(result)
This code is only slightly different from our first example:
search_page()
now takes three arguments:query
,page
, andlocation
page
andlocation
have been added into the formatted url- We also created another variable,
last_link
and use it to prevent doubles from getting into our results - We created a new
full_search()
function full_search()
simply runssearch_page()
on a list of pages and returns a full list of results
Step 3: Storing the Scraped Data
In the previous iterations of this scraper, we focused on returning uniform dict
objects from each of our functions. The reason for using these dictionaries is simple, when you hold object data in a dict
of key-value pairs, it's really easy to transform it into something else.
Not all libraries are build to handle all data formats, but almost all of them support JSON or dictionaries (both of these formats are key-value pairs).
Now, we'll remove the following code from the bottom of the script:
#print our results
for result in search_results:
print(result)
Add the following line to your imports:
import csv
Now, we'll add the following to the bottom of the file:
#path to the csv file
path_to_csv = "search-results.csv"
#open the file in write mode
with open(path_to_csv, "w") as file:
#format the file based on the keys of the first result
writer = csv.DictWriter(file, search_results[0].keys())
#write the headers
writer.writeheader()
#write each object as a row in the file
writer.writerows(search_results)
In this snippet, we:
- Create a
path_to_csv
variable - Open the file using
path_to_csv
and"w"
as arguments to open the file in write mode csv.DictWriter(file, search_results[0].keys())
tells thewriter
object to format our file based on the keys of the firstdict
object in our listwriter.writeheader()
writes the actual headers to the documentwriter.writerows(search_results)
writes our actual search results to the csv file
Step 4: Adding Concurrency
If you've run any of the previous examples, you should have noticed that it takes about 15 seconds to scrape the default 3 pages. In its current structure, our script goes through and scrapes each page sequentially. We can speed this up by scraping them concurrently.
In this section, we're going to refactor our full_search()
function so that things are done concurrently.
Here is our modified full_search()
function:
#function to search multiple pages, calls search_page() on each
def full_search(query, pages=3, location="United States"):
#list for our full results
full_results = []
#list of page numbers
page_numbers = list(range(0, pages))
#open with a max of 5 threads
with ThreadPoolExecutor(max_workers=5) as executor:
#call search page, pass all the following aruments into it
future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages)
#for each thread result
for page_result in future_results:
#add it to the full_results
full_results.extend(page_result)
#return the finalized list
return full_results
The full search function now does the following:
- Create a list for our full results
- Create a list of page numbers
- Open a
ThreadPoolExecutor
instance with a max of 5 workers executor.map(search_page, [query] * pages, page_numbers, [location] * pages)
callssearch_page()
and passes in lists of arguments to it- We then take each
page_result
and useextend()
to add it to thefull_results
list - Once finished, we return the list
Here is the newly updated file:
from selenium import webdriver
from selenium.webdriver.common.by import By
import csv
from concurrent.futures import ThreadPoolExecutor
#create a custom options instance
options = webdriver.ChromeOptions()
#add headless mode to our options
options.add_argument("--headless")
#this function performs a search and parses the results
def search_page(query, page, location):
#start Chrome with our custom options
driver = webdriver.Chrome(options=options)
#go to the page
driver.get(f"https://www.google.com/search?q={query}&start={page * 10}&location={location}")
#find each div containing site info...THEY'RE SUPER NESTED!!!
divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div")
#list to hold our results
results = []
#index, this will be used to number the results
index = 0
#last link
last_link = ""
#iterate through our divs
for div in divs:
#find the title element
title = div.find_elements(By.CSS_SELECTOR, "h3")
#find the link element
link = div.find_elements(By.CSS_SELECTOR, "a")
#result number on the page
result_number = index
#if we have a result
if len(title) > 0:
#site info object
site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page}
if site_info["link"] != last_link:
#add the object to our list
results.append(site_info)
#increment the index
index += 1
#update the last link
last_link = site_info["link"]
#the scrape has finished, close the browser
driver.quit()
#return the result list
return results
#function to search multiple pages, calls search_page() on each
def full_search(query, pages=3, location="United States"):
#list for our full results
full_results = []
#list of page numbers
page_numbers = list(range(0, pages))
#open with a max of 5 threads
with ThreadPoolExecutor(max_workers=5) as executor:
#call search page, pass all the following aruments into it
future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages)
#for each thread result
for page_result in future_results:
#add it to the full_results
full_results.extend(page_result)
#return the finalized list
return full_results
####this is our main program down here####
#results from the search
search_results = full_search("cool stuff")
#path to the csv file
path_to_csv = "concurrency.csv"
#open the file in write mode
with open(path_to_csv, "w") as file:
#format the file based on the keys of the first result
writer = csv.DictWriter(file, search_results[0].keys())
#write the headers
writer.writeheader()
#write each object as a row in the file
writer.writerows(search_results)
Step 5: Bypassing Anti-Bots
When scraping in the wild, we often run into anti-bot software. Anti-bots are exactly what they sound like. Because our scraper is a programmatically controlled browser, anti-bots will often block scrapers even if they're not malicious. In order to get past anti-bots, it is always best practice to use a proxy.
There are many tools to integrate proxies with different browsers, but the easiest way to do so is with simple string formatting. Take a look at the function below.
def get_scrapeops_url(url):
payload = {'api_key': API_KEY, 'url': url, 'country': 'us'}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url
As simple as it may look, this function holds the key to unlocking the power of the ScrapeOps Proxy. We simply encode our proxy params directly into the url that we want. We can then simply driver.get()
this new proxied url just like we would with a non-proxied url. When scraping at scale, we need to use proxies consistently.
The ScrapeOps Proxy rotates IP addresses and always uses the best proxy available for each request. This actually allows each of our requests to show up as a different user with potentially a different browser, OS and often a different location as well.
When using a proxy, no one can block you based on your location, because your location changes whenever you make a new request to the site.
Here is a proxied version of our script:
from selenium import webdriver
from selenium.webdriver.common.by import By
from time import sleep
import csv
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlencode
#create a custom options instance
options = webdriver.ChromeOptions()
#add headless mode to our options
options.add_argument("--headless")
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
def get_scrapeops_url(url):
payload = {'api_key': API_KEY, 'url': url, 'country': 'us'}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url
#this function performs a search and parses the results
def search_page(query, page, location):
#start Chrome with our custom options
driver = webdriver.Chrome(options=options)
#go to the page
driver.get(get_scrapeops_url(f"https://www.google.com/search?q={query}&start={page * 10}"))
#find each div containing site info...THEY'RE SUPER NESTED!!!
divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div")
#list to hold our results
results = []
#index, this will be used to number the results
index = 0
#last link
last_link = ""
#iterate through our divs
for div in divs:
#find the title element
title = div.find_elements(By.CSS_SELECTOR, "h3")
link = div.find_elements(By.CSS_SELECTOR, "a")
if len(title) > 0 and len(link) > 0:
#result number on the page
result_number = index
#site info object
site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page}
if site_info["link"] != last_link:
#add the object to our list
results.append(site_info)
#increment the index
index += 1
#update the last link
last_link = site_info["link"]
#the scrape has finished, close the browser
driver.quit()
#return the result list
return results
#function to search multiple pages, calls search_page() on each
def full_search(query, pages=3, location="United States"):
#list for our full results
full_results = []
#list of page numbers
page_numbers = list(range(0, pages))
#open with a max of 5 threads
with ThreadPoolExecutor(max_workers=5) as executor:
#call search page, pass all the following aruments into it
future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages)
#for each thread result
for page_result in future_results:
#add it to the full_results
full_results.extend(page_result)
#return the finalized list
return full_results
if __name__ == "__main__":
search_results = full_search("cool stuff")
#path to the csv file
path_to_csv = "proxied.csv"
#open the file in write mode
with open(path_to_csv, "w") as file:
#format the file based on the keys of the first result
writer = csv.DictWriter(file, search_results[0].keys())
#write the headers
writer.writeheader()
#write each object as a row in the file
writer.writerows(search_results)
Key things you should notice about this example:
"YOUR-SUPER-SECRET-API-KEY"
should be replaced by your API keyget_scrapeops_url()
converts normal urls into proxied ones- We have an actual
main
code block at the end of the script, this is because we're closer to production
Step 6: Production Run
Now, it's time for our production run. We added data storage earlier in the article. Since this example is meant to be the actual production code, we expand on that by adding a SearchData
class and a DataPipeline
class. SearchData
doesn't do much other than hold and format individual results. The DataPipeline
is where the real heavy lifting gets done as far as our production storage.
Here is our production scraper:
from selenium import webdriver
from selenium.webdriver.common.by import By
from time import sleep
import csv
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlencode
import os
import logging
from dataclasses import dataclass, field, fields, asdict
#create a custom options instance
options = webdriver.ChromeOptions()
#add headless mode to our options
options.add_argument("--headless")
API_KEY = "YOUR-SUPER-SECRET-API-KEY"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SearchData:
name: str
link: str
result_number: int
page_number: int
def __post_init__(self):
self.check_string_fields()
def check_string_fields(self):
for field in fields(self):
if isinstance(getattr(self, field.name), str):
if getattr(self, field.name) == '':
setattr(self, field.name, f"No {field.name}")
continue
value = getattr(self, field.name)
setattr(self, field.name, value.strip())
class DataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=50):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
self.data_to_save = []
self.data_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not self.data_to_save:
return
keys = [field.name for field in fields(self.data_to_save[0])]
file_exists = os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
with open(self.csv_filename, mode="a", newline="", encoding="UTF-8") as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for item in self.data_to_save:
writer.writerow(asdict(item))
self.csv_file_open = False
def is_duplicate(self, input_data):
if input_data.name in self.names_seen:
logger.warning(f"Duplicate item found: {input_data.name}. Item dropped")
return True
self.names_seen.append(input_data.name)
return False
def add_data(self, scraped_data):
if self.is_duplicate(scraped_data) == False:
self.storage_queue.append(scraped_data)
if len(self.storage_queue) >= self.storage_queue_limit and self.csv_file_open == False:
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
def get_scrapeops_url(url):
payload = {'api_key': API_KEY, 'url': url, 'country': 'us'}
proxy_url = 'https://proxy.scrapeops.io/v1/?' + urlencode(payload)
return proxy_url
#this function performs a search and parses the results
def search_page(query, page, location):
#start Chrome with our custom options
driver = webdriver.Chrome(options=options)
#go to the page
driver.get(get_scrapeops_url(f"https://www.google.com/search?q={query}&start={page * 10}"))
#find each div containing site info...THEY'RE SUPER NESTED!!!
divs = driver.find_elements(By.CSS_SELECTOR, "div > div > div > div > div > div > div > div > div > div > div > div > div > div")
#list to hold our results
results = []
#index, this will be used to number the results
index = 0
#last link
last_link = ""
#iterate through our divs
for div in divs:
#find the title element
title = div.find_elements(By.CSS_SELECTOR, "h3")
link = div.find_elements(By.CSS_SELECTOR, "a")
if len(title) > 0 and len(link) > 0:
#result number on the page
result_number = index
#site info object
site_info = {"title": title[0].text, "link": link[0].get_attribute("href"), "result_number": result_number, "page": page}
if site_info["link"] != last_link:
#add the object to our list
results.append(site_info)
#increment the index
index += 1
#update the last link
last_link = site_info["link"]
#the scrape has finished, close the browser
driver.quit()
#return the result list
return results
#function to search multiple pages, calls search_page() on each
def full_search(query, pages=3, location="United States"):
#list for our full results
full_results = []
#list of page numbers
page_numbers = list(range(0, pages))
#open with a max of 5 threads
with ThreadPoolExecutor(max_workers=5) as executor:
#call search page, pass all the following aruments into it
future_results = executor.map(search_page, [query] * pages, page_numbers, [location] * pages)
#for each thread result
for page_result in future_results:
#add it to the full_results
full_results.extend(page_result)
#return the finalized list
return full_results
if __name__ == "__main__":
logger.info("Starting scrape")
data_pipeline = DataPipeline(csv_filename="production-search.csv")
search_results = full_search("cool stuff")
for result in search_results:
search_data = SearchData(name=result["title"], link=result["link"], result_number=result["result_number"] , page_number=result["page"])
data_pipeline.add_data(search_data)
data_pipeline.close_pipeline()
logger.info("Scrape Complete")
- To change the output filename, simply change
"production-search.csv"
to your desired filename - To change your search query, change
"cool stuff"
to whatever query you'd like to perform - If you'd like to scrape a different amount of pages, you can use the
pages
kwarg in thefull_search()
function: - If you want 1000 pages of boring stuff, you could do
full_search("boring stuff", pages=1000)
Legal and Ethical Considerations
While scraping public data (if you don't have to login to view the data, it's public data.) is generally considered legally acceptable, when scraping in production, always respect the policies of the website you're trying to scrape.
Always remember that public data is fair game. Don't scrape people's personal information and certainly don't share it... Be respectful of other people and their privacy.
If you are not sure about the policies of the website you're scraping check their robots.txt
. You can view Google's robots.txt
here.
Another thing to consider is the terms & service (T&C) policies of the websites. Unauthorized scraping or violating terms of service may result in legal action or being blocked from accessing services.
According to the T&C policy, Google reserves the right to suspend or terminate your access to the services or delete your Google Account if they reasonably believe that your conduct causes harm or liability to a user, third party, or Google — for example, by hacking, phishing, harassing, spamming, misleading others, or scraping content that doesn’t belong to you.
It's crucial to consider not only the legality of scraping data but also how the scraped data will be used. Data scraped from Google or other websites may be subject to copyright laws or regulations governing personal data, depending on the jurisdiction and intended use.
Conclusion
You've made it to the end. You now have a decent understanding of how to build a production scraper. You've learned how to parse data, how to add concurrency and how to integrate a proxy with Selenium.
If you'd like to see documentation related to this article, take a look at the links below.
More Python Web Scraping Guides
Now that you've learned all this, take your new skills and build something! Here at ScrapeOps, we have all sorts of interesting stuff to read. If you're in the mood to learn more, take a look at one of the articles below.