The script below will take a CSV containing locations and find them.
It has the following features:
- You can swap out geocoding services
- For each run, it will look at the results of the past run as a cache
- For runs that hit an API, it will pause briefly to avoid rate limiting
- It caches the results as it goes, so that you do not hit an API too many times
- It prints out stats on how many rows failed to find matches, so that you can correct the input
- It handles blank rows – this allows you to copy from a partially populated spreadsheet, and pasted back into the original without alignment issues
Usage:
python3 geocode.py input.csv output.csv address
from geopy.geocoders import Nominatim
from functools import cache
import sys
import csv
import time
import os
inputfile = sys.argv[1]
outputfile = sys.argv[2]
inputlocation = sys.argv[3]
print("Reading: " + inputfile)
print("Writing: " + outputfile)
print("Address Column: " + inputlocation)
geolocator = Nominatim(user_agent="tracker")
class stats:
lookups = 0
totalrows = 0
skips = 0
unknowns = 0
successes = 0
priorruncachehits = 0
@cache
def lookup(value):
stats.lookups = stats.lookups + 1
return geolocator.geocode(locationvalue, language="en")
previous = {}
if (os.path.exists(outputfile)):
with open(outputfile, newline='') as f:
reader = csv.DictReader(f)
for row in reader:
locationvalue = row[inputlocation]
resolved = row['resolved_' + inputlocation]
if (resolved != None and resolved != ""):
print('resolved: ' + resolved + " " + str(row))
cachedlocation = {}
cachedlocation[inputlocation] = locationvalue
cachedlocation['address'] = resolved
cachedlocation['latitude'] = row['latitude']
cachedlocation['longitude'] = row['longitude']
previous[locationvalue] = cachedlocation
with open(outputfile, 'w+', newline='') as f:
writer = csv.writer(f)
headers = [inputlocation, 'resolved_' + inputlocation, 'latitude', 'longitude']
writer.writerow(headers)
with open('input.csv', newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
stats.totalrows = stats.totalrows + 1
locationvalue = row[inputlocation]
print(locationvalue)
if (locationvalue != ""):
if (locationvalue in previous.keys()):
stats.priorruncachehits = stats.priorruncachehits + 1
writer.writerow([locationvalue, previous[locationvalue]['address'], previous[locationvalue]['latitude'], previous[locationvalue]['longitude']])
else:
location = lookup(locationvalue)
if (location == None):
stats.unknowns = stats.unknowns + 1
print("Failed to find " + locationvalue)
writer.writerow([locationvalue, "", "", ""])
else:
stats.successes = stats.successes + 1
print(location)
writer.writerow([locationvalue, location.address, location.latitude, location.longitude])
time.sleep(1)
else:
stats.skips = stats.skips + 1
writer.writerow(["", "", "", ""])
print("rows: " + str(stats.totalrows))
print("successes: " + str(stats.successes))
print("lookups: " + str(stats.lookups))
print("blank lines: " + str(stats.skips))
print("unknowns: " + str(stats.unknowns))
print("cache hits from prior run: " + str(stats.priorruncachehits))