Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 160 additions & 24 deletions gateway/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,70 @@
import flask
import paho.mqtt.client as mqtt
import werkzeug.security as wsecurity
import structlog

from urllib.parse import urlparse
import os
import json
import time
import queue
import functools

import logging
logging.basicConfig()
log_level = os.environ.get('LOGLEVEL', 'info')
level = getattr(logging, log_level.upper())
import uuid

def make_id():
import datetime
import uuid

t = datetime.datetime.now().strftime('%Y%m%d-%H%M')
u = str(uuid.uuid4())[0:4]
id = f'{t}-{u}'
return id

def configure_logging(log_path, filename=None):

if not os.path.exists(log_path):
os.makedirs(log_path)

if filename is None:
filename = f'{make_id()}.gateway.log'

key_order = ['door', 'method', 'path', 'user', 'request_id', 'topic', 'payload']

# FIXME: add timestamping
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestamping is added by journald/docker so it might not be needed.

structlog.configure(
processors=[
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
)

formatter = structlog.stdlib.ProcessorFormatter(
processor=structlog.dev.ConsoleRenderer(),
)

file_formatter = structlog.stdlib.ProcessorFormatter(
structlog.processors.KeyValueRenderer(key_order=key_order, drop_missing=True)
)

file = logging.FileHandler(filename)
stdout = logging.StreamHandler()

stdout.setFormatter(formatter)
file.setFormatter(file_formatter)

log_mqtt = logging.getLogger('mqtt')
log_mqtt.setLevel(level)
log = logging.getLogger('gateway')
log.setLevel(level)
logging.basicConfig(
format="%(message)s",
level=logging.DEBUG,
handlers=[file, stdout],
)


log_path = os.environ.get('DLOCK_LOG_PATH', 'logs')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the log file rotated with this?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No but there is a handler in the standard library which has rotation. Can switch to that


configure_logging(log_path)

log = structlog.get_logger()


class DoorBoltStatus:
Expand Down Expand Up @@ -58,13 +105,13 @@ def check_match(self, topic, data):
self.queue.put((topic, data))
return match
except Exception as e:
log_mqtt.warning('MesssageWaiter: Exception match check: {}'.format(e))
log.warning('MesssageWaiter: Exception match check: {}'.format(e))

def mqtt_message_received(client, u, message):
try:
mqtt_handle_message(client, u, message)
except Exception as e:
log_mqtt.exception('Failed to handle message %: %s '.format(message.topic, message.payload))
log.exception('Failed to handle message %: %s '.format(message.topic, message.payload))

def door_id_from_mqtt(prefix):
door_id = None
Expand All @@ -73,17 +120,24 @@ def door_id_from_mqtt(prefix):
door_id = d
return door_id


def elide(data, length=30, suffix=b'...'):
return (data[:length] + suffix) if len(data) > length else data

def mqtt_handle_message(client, u, message):
log_mqtt.debug('received {}: {}'.format(message.topic, message.payload))

start_time = time.time()
matches = []
door = door_from_topic(message.topic)

if message.topic == 'fbp':
# Heartbeat messages
d = message.payload.decode('utf8')
m = json.loads(d)

device = m['payload']['role']
door = door_id_from_mqtt(device)
t = time.time()
log_mqtt.debug('saw device {} at {}'.format(device, t))
m['time_received'] = t

# Persist so we can query for device status
Expand All @@ -105,24 +159,35 @@ def mqtt_handle_message(client, u, message):

else:
# Check responses
matches = []
for waiter in mqtt_message_waiters:
m = waiter.check_match(message.topic, message.payload.decode('utf8'))
if m:
matches.append(m)
if len(matches) == 0:
log_mqtt.debug('No matches for message on: {}. {} waiters'.format(message.topic, len(mqtt_message_waiters)))

end_time = time.time()

duration = 1000.0*(end_time-start_time)
payload = elide(message.payload)

log.info('mqtt-receive',
door=door,
topic=message.topic,
payload=payload,
duration_ms=duration,
matches=len(matches),
waiters=len(mqtt_message_waiters),
)


def mqtt_subscribed(client, u, mid, granted_qos):
log_mqtt.info('subscribed')
log.info('mqtt-subscribed', mid=mid)

def mqtt_disconnected(client, u, rc):
log_mqtt.info('disconnected: {}'.format(rc))
log.info('mqtt-disconnected', rc=rc)
# client automatically handles reconnect

def mqtt_connected(client, u, f, rc):
log_mqtt.info('connected')
log.info('mqtt-connected', rc=rc)
subscriptions = [
('fbp', 0),
]
Expand All @@ -135,7 +200,6 @@ def mqtt_connected(client, u, f, rc):
subscriptions.append((topic, 0))

client.subscribe(subscriptions)
log_mqtt.info('subscribe()')

def setup_mqtt_client():
broker_url = os.environ.get('MSGFLO_BROKER', 'mqtt://localhost')
Expand All @@ -147,7 +211,6 @@ def setup_mqtt_client():
client.on_subscribe = mqtt_subscribed

client.connect(host, port, 60)
log_mqtt.info('connect() done')
return client

def create_mqtt_client(broker_url):
Expand Down Expand Up @@ -178,7 +241,10 @@ def mqtt_send(topic, payload):

client = mqtt_client
client.publish(topic, payload)
log_mqtt.debug('sent {}: {}'.format(topic, payload))

door = door_from_topic(topic)
# FIXME: also provide request_id
log.info('mqtt-send', door=door, topic=topic, payload=payload)

def seen_since(messages, time : float):
devices = {}
Expand Down Expand Up @@ -269,11 +335,81 @@ def __init__(self, mqtt_prefix, bolt_sensor=False):
}
api_users = {}


def door_from_path(path):
# /doors/virtual-1/unlock?...
if not path.startswith('/doors'):
return None

tok = path.split('/')
door = tok[2]
return door

def door_from_topic(topic):
# doors/device-100/unlock
#m = re.match(r"doors\/(.*)\/.*")
tok = topic.split('/')
if len(tok) <= 2:
return None
prefix = '/'.join(tok[0:2])
door = door_id_from_mqtt(prefix)
return door

def request_log_params():
r, g = flask.request, flask.g

user = None if not r.authorization else r.authorization.username
query = f'{r.path}?{r.query_string.decode("utf-8")}'
door = door_from_path(r.path)

log_params = dict(
request_id=g.request_id,
method=r.method,
path=query,
user=user,
door=door,
)
return log_params


## Logging helpers
def log_request(sender, **extra):
r, g = flask.request, flask.g

# add info
g.request_id = r.headers.get('X-Request-Id', str(uuid.uuid4()))
g.request_start_time = time.time()

log_params = request_log_params()
log_params.update(dict(
))

log.info('http-request-start', **log_params)

def log_response(sender, response, **extra):
r, g = flask.request, flask.g

g.request_end_time = time.time()
duration = 1000.0 * (flask.g.request_end_time - flask.g.request_start_time)

log_params = request_log_params()
log_params.update(dict(
status_code=response.status_code,
duration_ms=duration,
))

log.info('http-request-end', **log_params)
Comment on lines +375 to +401
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these could push at least the request path to a thread local dict that would be useful. makes it easier to correlate stuff. https://www.structlog.org/en/stable/thread-local.html

If it could add a "request_id" field which is just an uuid we can trace all logging from a specific request which is also nice.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request path and request_id is included, see the request_log_param() helper. Though not all logged events have it yet. Will try to use the threadlocal mechanism instead of doing it explicitly, to get it everywhere



flask.request_started.connect(log_request, app)
flask.request_finished.connect(log_response, app)


## System functionality
# No auth
@app.route('/')
def index():
return 'UnlockOslo device gateway'
return 'UnlockOslo device gateway'


@app.route('/status')
Expand Down Expand Up @@ -457,8 +593,8 @@ def main():

port = os.environ.get('PORT', 5000)
ip = os.environ.get('INTERFACE', '127.0.0.1')
server = gevent.pywsgi.WSGIServer((ip, port), app)
log.info('Gateway running on {}:{}'.format(ip, port))
server = gevent.pywsgi.WSGIServer((ip, port), app, log=None)
log.info('started', ip=ip, port=port)
server.serve_forever()

if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions gateway/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
flask>=0.12
gevent>=1.2
paho-mqtt>=1.3
structlog>=19.2.0