Tag: kibana logstash elastic elasticsearch dashboard fun portfolio

Setting up a commuters dashboard: Part 1 – Collecting freeway traffic

Elastic stack see here

Python traffic collector

Set the stage for the rest of the script. Importing everything necessary to pull traffic via googlemaps API and log results to a file

import googlemaps
 from datetime import datetime
 import os, json
 import logging, sys,time, yaml

logger = logging.getLogger('maps')
 logger.setLevel(logging.DEBUG)
 fh = logging.FileHandler('/var/log/maps.log')
 fh.setLevel(logging.DEBUG)
 # create console handler with a higher log level
 ch = logging.StreamHandler(sys.stdout)
 ch.setLevel(logging.DEBUG)
 # create formatter and add it to the handlers
 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 fh.setFormatter(formatter)
 ch.setFormatter(formatter)
 # add the handlers to the logger
 logger.addHandler(fh)
 logger.addHandler(ch)

 

To keep the keys outside the code, I used a yaml file to reference the my simple config. Depending on the use case and other implementations, system environmental variables or other configurations could be used here.

def getKey():
 try:
 GKEYPATH='/home/traffic-app/.gkey.yml'
 yaml_file=open(GKEYPATH)

GKEY = yaml.load(yaml_file)['gkey']

return GKEY
 except Exception as stderr:
 logger.critical(stderr)

 

The brain of the script is calculating the duration from point A to point B. This is done with googles gmaps API by using geocoded locations and making a call for directions. Part of the response is the duration in traffic. The logger object allows us to log to a file while seeing it in the terminal. This can eventually be fed into an S3 bucket for long term retention, and followed in real time for live interactions

def duration(geocoded_a, geocoded_b):
 now = datetime.now()
 geolat_a = json.dumps(geocoded_a[0]['geometry']['location']['lat'])
 geolon_a = json.dumps(geocoded_a[0]['geometry']['location']['lng'])

geolat_b = json.dumps(geocoded_b[0]['geometry']['location']['lat'])
 geolon_b = json.dumps(geocoded_b[0]['geometry']['location']['lng'])

loc_a = json.dumps(geocoded_a[0]['formatted_address'])
 loc_b = json.dumps(geocoded_b[0]['formatted_address'])

directions_result = gmaps.directions('{},{}'.format(geolat_a,geolon_a),
 '{},{}'.format(geolat_b,geolon_b),
 mode="driving",
 departure_time=now)

duration=json.dumps(directions_result[0]['legs'][0]['duration']['value'])
 duration_traffic=json.dumps(directions_result[0]['legs'][0]['duration_in_traffic']['value'])

logger.info('{a} - {b} - {duration} - {duration_traffic}'.format(a=loc_a, b=loc_b, duration=duration, duration_traffic=duration_traffic))

 

The calls are made forever, sleeping every 10 minute to avoid running out of API calls. The traffic doesn’t update more frequently, so this is an acceptable refresh rate.

GKEY=getKey()
 gmaps = googlemaps.Client(key=GKEY)

geocoded_a = gmaps.geocode(LOCATION_A)
 geocoded_b = gmaps.geocode(LOCATION_B)

while True:
 duration(geocoded_a, geocoded_b)
 duration(geocoded_b, geocoded_a)
 time.sleep(600)
 except Exception as stderr:
 logger.critical(stderr)