diff --git a/aggregate-access-log b/aggregate-access-log index e17b621..e06de45 100755 --- a/aggregate-access-log +++ b/aggregate-access-log @@ -1,5 +1,40 @@ #!/usr/bin/env awk -f +BEGIN { + # derive class from matching hostname against regex patterns provided from environment variable + # export ACCESS_CLASSES='[["clj", "^clj-fe-"], ["app-topic", "app-topic.*$"]]' + # OR via mol-config + # config: + # /: + # ACCESS_CLASSES: + # - [ clj, '(clj-|bauhaus-)' ] + # - [ cc, '(cc-|cc[d])' ] + + class="default" + "hostname" | getline hostname + while("echo \"$ACCESS_CLASSES\" | jq -r '.[] | .[0] +\" \"+.[1]'" | getline) { + _regex=$2 + _class=$1 + #printf "Checking if %s matches %s to give %s\n", hostname, $_regex, $_class > "/dev/stderr" + if (hostname ~ $_regex) { + #printf ">> %s matches %s to give %s\n", hostname, $_regex, $_class > "/dev/stderr" + class=$_class + break; + } + } + + if (match(hostname, /-(\w+)[0-9]+\./, results)) { + #printf "Cluster for host %s found %s \n", hostname, results[1] > "/dev/stderr" + cluster=results[1] + } else { + #printf "cluster not found for %s\n", hostname > "/dev/stderr" + cluster=null + } + + printf "METRIC ns=aggregatelogger.start host=%s class=%s cluster=%s\n", hostname, class, cluster > "/dev/stderr" +} + + function top_array(result, top, acc) { delete temp c = 0 @@ -30,10 +65,8 @@ function print_ua(ts, acc) { for (i in tops) { ua = tops[i][2] gsub("\"", "'", ua) - printf "%s METRIC ns=fe.access.ua count=%d ua=\"%s\"\n", ts, tops[i][1], ua - + printf "%s ns=fe.access.ua class="%s" cluster="%s" count=%d ua=\"%s\"\n", ts, class, cluster, tops[i][1], ua } - delete tops } @@ -46,7 +79,7 @@ function print_errors(ts, acc) { split(value, values, " ") code = values[1]+0 uri = values[2] - printf "%s METRIC ns=fe.access.errors count=%d error=%d uri=\"%s\"\n", ts, tops[i][1], code, uri + printf "%s ns=fe.access.errors class="%s" cluster="%s" count=%d error=%d uri=\"%s\"\n", ts, class, cluster, tops[i][1], code, uri } @@ -68,13 +101,12 @@ function print_reqs(ts, acc) { split(value, values, " ") code = values[1]+0 uri = values[2] - printf "%s METRIC ns=fe.access.slow count=%d total=%d code=%d uri=\"%s\"\n", ts, count, total, code, uri + printf "%s ns=fe.access.slow class="%s" cluster="%s" count=%d total=%d code=%d uri=\"%s\"\n", ts, class, cluster, count, total, code, uri } delete tops - top_array(tops, 15, acc["reqs"]) for (i in tops) { @@ -85,8 +117,7 @@ function print_reqs(ts, acc) { split(value, values, " ") code = values[1]+0 uri = values[2] - printf "%s METRIC ns=fe.access.count count=%d total=%d code=%d uri=\"%s\"\n", ts, count, total, code, uri - + printf "%s ns=fe.access.count class="%s" cluster="%s" count=%d total=%d code=%d uri=\"%s\"\n", ts, class, cluster, count, total, code, uri } delete tops @@ -95,17 +126,20 @@ function print_reqs(ts, acc) { function print_groups(ts, acc) { for (i in acc["times"]) { - printf "%s METRIC ns=fe.access.group group_name=\"%s\" count=%d avg=%.1f max=%d min=%d\n", ts, i, acc["count"][i], acc["times"][i]/acc["count"][i], acc["max"][i], acc["min"][i] + printf "%s ns=fe.access.group class="%s" cluster="%s" group_name=\"%s\" count=%d avg=%.1f max=%d min=%d\n", ts, class, cluster, i, acc["count"][i], acc["times"][i]/acc["count"][i], acc["max"][i], acc["min"][i] } } function print_codes(ts, acc) { for (i in acc["code"]) { - printf "%s METRIC ns=fe.access.bots response_code=%s count=%d bots=%d\n", ts, i, acc["code"][i], acc["bots"][i] + printf "%s ns=fe.access.bots class="%s" cluster="%s" response_code=%s count=%d bots=%d\n", ts, class, cluster, i, acc["code"][i], acc["bots"][i] + } } function print_acc(ts, acc) { + #DEBUG ONLY + #printf "%s times=%s class="%s" cluster="%s" code=%s ua=%s reqs=%s count=%s size=%s\n ", ts, class, cluster, length(acc["times"]), length(acc["code"]), length(acc["ua"]), length(acc["reqs"]), length(acc["count"]), length(acc["size"]) > "/dev/stderr" # in case we lose the next few bytes if network connection is lost, we just lose empty lines for (i = 1; i <= 10; i++) { @@ -118,29 +152,59 @@ function print_acc(ts, acc) { if (length(acc["errors"]) > 0) { print_errors(ts, acc["errors"]) } if (length(acc["reqs"]) > 0) { print_reqs(ts, acc) } + print ts > "/dev/stderr" # not 100% sure what this is for. Possibly originally planned to flush buffer but it goes to STDERR which doesn't make sense. Leaving incase removing breaks it. At least it shows that the process is not hung. - print ts > "/dev/stderr" + #printf "%s times=%s code=%s", ts, length(acc["times"]), length(acc["code"]) > "/dev/stderr" fflush(stdout) } + { + # DEBUG ONLY + # printf "line is '%s'\n", $0 > "/dev/stderr" - ts = sprintf("%s%s", substr($4, 2, 19), substr($4, 25, 5)) - current_minute = substr(ts, 1, 16) + # Known log formats + # ::ffff:10.251.203.252 - - [15/Mar/2018:11:50:55 +0000] "GET /api/apptopics HTTP/1.1" 200 40331 "http://app-topics.int.mol.dmgt.net/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Ge + # 10.251.198.10 - - [2018-03-15T14:46:11.946+0000] "HEAD /home/index.html HTTP/1.1" 200 0 579 "-" "Varnish Health Probe" - if (current_minute != last_minute) { + # ip ? ? time method url proto code length restime(opt) referrer ua + if (match($0, /([^ ]+)\s([^ ]+)\s([^ ]+)\s\[([^\]]+)\]\s\"(\w+) ([^ ]+)\s([^ ]+)"\s([0-9]+)\s([0-9]+)\s(([0-9]+)\s)?"([^ ]+)"\s"(.*)"/, results)) { + process_line(results) - if (length(first_timestamp)>0) { + } else { + printf "Invalid line '%s'\n", $0 > "/dev/stderr"; + } +} + +function process_line(results) { + _time=results[4] + _url=results[6] + _proto=results[7] + _code=results[8] + _size=results[9] + _duration=results[11] + _ua=results[13] + + # Known time formats + #[2018-03-15T14:46:11.946+0000] + #[15/Mar/2018:11:50:55 +0000] + ts = _time + match(_time, /(.*[0-9]{2}:[0-9]{2}):[0-9]{2}/, _time_results); //find string reprepresenting time to nearest min + current_minute = _time_results[1] + + # output aggregations if we have passed prior recorded minute + if (current_minute != last_minute) { + if (length(first_timestamp)>0) { print_acc(first_timestamp, acc) } - first_timestamp = ts last_minute = current_minute; delete acc } - url = $6 + + url = _url if (gsub("^/textbased/.*", "textbased", url) || gsub(".*article-[0-9]*/amp/.*", "amp/articles", url) || @@ -171,12 +235,11 @@ function print_acc(ts, acc) { gsub("^/.*$", "others", url)) {} - acc["code"][$8] +=1 + acc["code"][_code] +=1 - - response_time = $10+0 + response_time = _duration+0 acc["count"][url] += 1 - acc["size"][url] += $9 + acc["size"][url] += _size acc["times"][url] += response_time if (length(acc["min"][url]) == 0 || acc["min"][url] > response_time) { acc["min"][url] = response_time @@ -185,19 +248,15 @@ function print_acc(ts, acc) { acc["max"][url] = response_time } - a="" - for (i=12;i<=NF;i++) { - a=a " " $i - } - ua = substr(a,3,length(a)-3) + ua = _ua acc["ua"][ua] += 1 IGNORECASE = 1 if (match(ua, /bot|google|crawler|spider|robot|crawling|wget|http|slurp|analyzer|sitecon|@/) || ua == "-") { - acc["bots"][$8] += 1 + acc["bots"][_code] += 1 } - code = $8+0 - uri = $6 + code = _code+0 + uri = _url gsub("\\?.*", "", uri) code_uri = sprintf("%03d %s", code, uri) acc["reqs"][code_uri] += 1 @@ -207,10 +266,11 @@ function print_acc(ts, acc) { acc["errors"][code_uri] += 1 } - - + # DEBUG ONLY + # printf "_time=%s _url=%s _proto=%s _code=%s _size=%s _duration=%s ts=%s ua=%s code_uri=%s \n", _time, _url, _proto, _code, _size, _duration, ts, ua, code_uri > "/dev/stderr" } + END { print_acc(first_timestamp, acc) diff --git a/forwarder b/forwarder index 80eef8f..fcff401 100755 --- a/forwarder +++ b/forwarder @@ -1,7 +1,12 @@ #!/usr/bin/env python from __future__ import print_function -import ConfigParser + +try: + from configparser import ConfigParser # Python 3 import +except ImportError: + from ConfigParser import ConfigParser # If ConfigParser missing, we're on Py3, import Py2 as Py3 name + import os import optparse import sys diff --git a/offsets b/offsets new file mode 100644 index 0000000..2a70637 --- /dev/null +++ b/offsets @@ -0,0 +1 @@ +b'bd4a6765a4aa9dd566cbe16e2d8ac81e' 11612437 /Users/gopi.thumati/lem-logs/log diff --git a/send_to_es b/send_to_es index 8e9f0ee..79bd525 100755 --- a/send_to_es +++ b/send_to_es @@ -1,13 +1,24 @@ #!/usr/bin/env python -import sys, json, os, urllib2, datetime, re, Queue, threading, time, optparse +import sys, json, os, datetime, re, threading, time, optparse +try: + import queue # Python 3 import +except ImportError: + import Queue as queue # If queue missing, we're on Py2, import Py2 as Py3 name + +try: + # For Python 3.0 and later + import urllib.request as urlopen +except ImportError: + # Fall back to Python 2's urllib2 + import urllib2 as urlopen class ExitToken: - pass + pass exit_token = ExitToken() -q = Queue.Queue(1000) +q = queue.Queue(1000) hostname = os.popen("hostname").read().strip() @@ -22,64 +33,64 @@ options, args = parser.parse_args() es_url = args[0] def to_event(event): - try: - match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line']) - if match: - timestamp = match.group(1) + match.group(2)[0:3] - timestamp += match.group(3) if match.group(3) else "+0000" - if timestamp[10] == ' ': - timestamp = timestamp[0:10] + 'T' + timestamp[11:] - else: - timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1) - except ValueError, e: - timestamp = event['time'] - data = { - "@timestamp": timestamp, - "host": options.hostname, - "message": event_line(event['line'].strip()), - } - return json.dumps(data) + try: + match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line']) + if match: + timestamp = match.group(1) + match.group(2)[0:3] + timestamp += match.group(3) if match.group(3) else "+0000" + if timestamp[10] == ' ': + timestamp = timestamp[0:10] + 'T' + timestamp[11:] + else: + timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1) + except ValueError as e: + timestamp = event['time'] + data = { + "@timestamp": timestamp, + "host": options.hostname, + "message": event_line(event['line'].strip()), + } + return json.dumps(data) def starts_with_space(line): - return len(line) > options.offset and line[options.offset] in [' ', '\t'] + return len(line) > options.offset and line[options.offset] in [' ', '\t'] def event_line(line): - return line[options.offset:] if options.cut else line + return line[options.offset:] if options.cut else line def sending(): - running = True - lastEvent = None - event = None - while event != exit_token: - count = 0; - payload = '' - while count < 1000 and event != exit_token and (not q.empty() or count == 0): - try: - event = q.get(True, 1) - except Queue.Empty, e: - event = None - if event and event != exit_token and starts_with_space(event['line']): - lastEvent['line'] += "\n" + event_line(event['line']) - else: - if lastEvent: - payload += "{\"index\": {}}\n" + to_event(lastEvent) + "\n" - count += 1 - lastEvent = None - if not event: - break - lastEvent = event - - if count > 0: - # print "----------------------" - # print payload[0:-1] - while True: - try: - urllib2.urlopen("%s/_bulk" % es_url, payload) - break - except URLError, e: - print >> sys.stderr, "Failed sending bulk to ES: %s" % str(e) - sleep(3) + running = True + lastEvent = None + event = None + while event != exit_token: + count = 0 + payload = '' + while count < 1000 and event != exit_token and (not q.empty() or count == 0): + try: + event = q.get(True, 1) + except queue.Empty as e: + event = None + if event and event != exit_token and starts_with_space(event['line']): + lastEvent['line'] += "\n" + event_line(event['line']) + else: + if lastEvent: + payload += "{\"index\": {}}\n" + to_event(lastEvent) + "\n" + count += 1 + lastEvent = None + if not event: + break + lastEvent = event + + if count > 0: + # print "----------------------" + # print payload[0:-1] + while True: + try: + urlopen.urlopen("%s/_bulk" % es_url, payload) + break + except URLError as e: + print >> sys.stderr, "Failed sending bulk to ES: %s" % str(e) + time.sleep(3) t = threading.Thread(target=sending) @@ -87,16 +98,16 @@ t.daemon = True t.start() try: - while 1: - line = sys.stdin.readline() - if not line: - q.put(exit_token) - break - line = line[0:-1] - if len(line) > 0: - q.put({'time': datetime.datetime.now().isoformat(), 'line': line}) - # q.put(to_event(line)) - t.join() -except KeyboardInterrupt, e: - pass + while 1: + line = sys.stdin.readline() + if not line: + q.put(exit_token) + break + line = line[0:-1] + if len(line) > 0: + q.put({'time': datetime.datetime.now().isoformat(), 'line': line}) + # q.put(to_event(line)) + t.join() +except KeyboardInterrupt as e: + pass diff --git a/send_to_rabbitmq b/send_to_rabbitmq index a662460..94c1f1e 100755 --- a/send_to_rabbitmq +++ b/send_to_rabbitmq @@ -1,13 +1,29 @@ #!/usr/bin/env python -import sys, json, os, urllib, urllib2, datetime, re, Queue, threading, time, optparse, base64 +import sys, json, os, datetime, re, threading, time, optparse, base64 +try: + import urllib.parse as quote # Python 3+ +except ImportError: + import urllib as quote # Python 2.X + +try: + # For Python 3.0 and later + import urllib.request as urlopen +except ImportError: + # Fall back to Python 2's urllib2 + import urllib2 as urlopen + +try: + import queue # Python 3 import +except ImportError: + import queue as queue # If queue missing, we're on Py2, import Py2 as Py3 name class ExitToken: - pass + pass exit_token = ExitToken() -q = Queue.Queue(1000) +q = queue.Queue(1000) hostname = os.popen("hostname").read().strip() @@ -23,13 +39,13 @@ parser.add_option('-u', '--credentials USERNAME:PASSWORD', action="store", dest= options, args = parser.parse_args() if not options.exchange: - parser.error('Exchange not given') + parser.error('Exchange not given') if len(args) == 0: - parser.error('RabbitMQ URL not given') + parser.error('RabbitMQ URL not given') -encoded_vhost = urllib.quote(options.vhost, safe='') -encoded_exchange = urllib.quote(options.exchange, safe='') +encoded_vhost = quote(options.vhost, safe='') +encoded_exchange = quote(options.exchange, safe='') rabbitmq_url = args[0] @@ -38,85 +54,83 @@ base64string = base64.standard_b64encode('%s:%s' % (cred[0], cred[1])).replace(' uri = "%s/api/exchanges/%s/%s/publish" % (rabbitmq_url, encoded_vhost, encoded_exchange) - def to_event(event): - try: - match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line']) - if match: - timestamp = match.group(1) + match.group(2)[0:3] - timestamp += match.group(3) if match.group(3) else "+0000" - if timestamp[10] == ' ': - timestamp = timestamp[0:10] + 'T' + timestamp[11:] - else: - timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1) - except ValueError, e: - timestamp = event['time'] - data = { - "@timestamp": timestamp, - "host": options.hostname, - "message": event['line'].strip(), - } - return json.dumps(data) - + try: + match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line']) + if match: + timestamp = match.group(1) + match.group(2)[0:3] + timestamp += match.group(3) if match.group(3) else "+0000" + if timestamp[10] == ' ': + timestamp = timestamp[0:10] + 'T' + timestamp[11:] + else: + timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1) + except ValueError as e: + timestamp = event['time'] + data = { + "@timestamp": timestamp, + "host": options.hostname, + "message": event['line'].strip(), + } + return json.dumps(data) def starts_with_space(line): - return len(line) > options.offset and line[options.offset] in [' ', '\t'] + return len(line) > options.offset and line[options.offset] in [' ', '\t'] def sending(): - running = True - lastEvent = None - event = None - while event != exit_token: - count = 0; - payload = '' - while count < 1 and event != exit_token and (not q.empty() or count == 0): - try: - event = q.get(True, 1) - except Queue.Empty, e: - event = None - if event and event != exit_token and starts_with_space(event['line']): - lastEvent['line'] += "\n" + event['line'] - else: - if lastEvent: - payload += json.dumps({ - "properties":{}, - "routing_key":options.routing_key, - "payload":to_event(lastEvent), - "payload_encoding":"string"} - ) - # payload += "{\"index\": {}}\n" + to_event(lastEvent) + "\n" - count += 1 - lastEvent = None - if not event: - break - lastEvent = event - - if count > 0: - request = urllib2.Request(uri) - request.add_header("Authorization", "Basic %s" % base64string) - # print "----------------------" - # print "sending to: %s" % uri - # print payload[0:-1] - - urllib2.urlopen(request, payload) - # print "done" - - # urllib2.urlopen(uri, payload) + running = True + lastEvent = None + event = None + while event != exit_token: + count = 0 + payload = '' + while count < 1 and event != exit_token and (not q.empty() or count == 0): + try: + event = q.get(True, 1) + except queue.Empty as e: + event = None + if event and event != exit_token and starts_with_space(event['line']): + lastEvent['line'] += "\n" + event['line'] + else: + if lastEvent: + payload += json.dumps({ + "properties":{}, + "routing_key":options.routing_key, + "payload":to_event(lastEvent), + "payload_encoding":"string"} + ) + # payload += "{\"index\": {}}\n" + to_event(lastEvent) + "\n" + count += 1 + lastEvent = None + if not event: + break + lastEvent = event + + if count > 0: + request = urlopen.Request(uri) + request.add_header("Authorization", "Basic %s" % base64string) + # print "----------------------" + # print "sending to: %s" % uri + # print payload[0:-1] + + urlopen.urlopen(request, payload) + # print "done" + + # urlopen.urlopen(uri, payload) t = threading.Thread(target=sending) t.daemon = True t.start() try: - while 1: - line = sys.stdin.readline() - if not line: - q.put(exit_token) - break - line = line[0:-1] - if len(line) > 0: - q.put({'time': datetime.datetime.now().isoformat(), 'line': line}) - t.join() -except KeyboardInterrupt, e: - pass + while 1: + line = sys.stdin.readline() + if not line: + q.put(exit_token) + break + line = line[0:-1] + if len(line) > 0: + q.put({'time': datetime.datetime.now().isoformat(), 'line': line}) + t.join() +except KeyboardInterrupt as e: + pass diff --git a/send_to_stomp b/send_to_stomp index f7968c7..daf3996 100755 --- a/send_to_stomp +++ b/send_to_stomp @@ -1,16 +1,20 @@ #!/usr/bin/env python -from __future__ import print_function -import sys, json, os, datetime, re, Queue, threading, time, optparse, base64 +import sys, json, os, datetime, re, threading, time, optparse, base64 +try: + import queue # Python 3 import +except ImportError: + import queue as queue # If queue missing, we're on Py2, import Py2 as Py3 name + from select import select from stompclient import StompClient import sys class ExitToken: - pass + pass exit_token = ExitToken() -q = Queue.Queue(1000) +q = queue.Queue(1000) hostname = os.popen("hostname").read().strip() @@ -28,27 +32,26 @@ parser.add_option('-b', '--heartbeat INTERVAL', action="store", dest="heartbeat" options, args = parser.parse_args() if not options.exchange: - parser.error('Exchange not given') + parser.error('Exchange not given') if len(args) == 0: - parser.error('Stomp host:port not given') + parser.error('Stomp host:port not given') def eprint(*args, **kwargs): - print(*args, file=sys.stderr, **kwargs) + print(*args, file=sys.stderr, **kwargs) def log_json(name, props={}): - a = { - 'timestamp': datetime.datetime.now().isoformat(), - 'ns': 'forwarder.stomp', - 'hostname': hostname, - 'name': name - } - z = props.copy() - z.update(a) - eprint(json.dumps(z)) - - -log_json('starting', dict([(property, value) for property, value in vars(options).iteritems()])) + a = { + 'timestamp': datetime.datetime.now().isoformat(), + 'ns': 'forwarder.stomp', + 'hostname': hostname, + 'name': name + } + z = props.copy() + z.update(a) + eprint(json.dumps(z)) + +log_json('starting', dict([(property, value) for property, value in vars(options).items()])) stomp_address = args[0].split(":") @@ -57,81 +60,79 @@ stomp = StompClient(stomp_address[0], int(stomp_address[1])) cred = options.credentials.split(":") stomp.connect(cred[0], cred[1], options.vhost) - def to_event(event): - try: - match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line']) - if match: - timestamp = match.group(1) + match.group(2)[0:3] - timestamp += match.group(3) if match.group(3) else "+0000" - if timestamp[10] == ' ': - timestamp = timestamp[0:10] + 'T' + timestamp[11:] - else: - timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1) - except ValueError, e: - timestamp = event['time'] - return { - "@timestamp": timestamp, - "host": options.hostname, - "message": event_line(event['line'].strip()), - } - + try: + match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line']) + if match: + timestamp = match.group(1) + match.group(2)[0:3] + timestamp += match.group(3) if match.group(3) else "+0000" + if timestamp[10] == ' ': + timestamp = timestamp[0:10] + 'T' + timestamp[11:] + else: + timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1) + except ValueError as e: + timestamp = event['time'] + return { + "@timestamp": timestamp, + "host": options.hostname, + "message": event_line(event['line'].strip()), + } def starts_with_space(line): - return len(line) > options.offset and line[options.offset] in [' ', '\t'] + return len(line) > options.offset and line[options.offset] in [' ', '\t'] def event_line(line): - return line[options.offset:] if options.cut else line + return line[options.offset:] if options.cut else line def sending(): - lastEvent = None - event = None - while event != exit_token: - count = 0; - payload = '' - while count < 1 and event != exit_token and (not q.empty() or count == 0): - try: - event = q.get(True, 1) - except Queue.Empty, e: - event = None - if event and event != exit_token and starts_with_space(event['line']): - lastEvent['line'] += "\n" + event_line(event['line']) - else: - if lastEvent: - payload += json.dumps(to_event(lastEvent)) - count += 1 - lastEvent = None - if not event: - break - lastEvent = event - - if count > 0: - stomp.send(options.exchange, options.routing_key, payload) + lastEvent = None + event = None + while event != exit_token: + count = 0 + payload = '' + while count < 1 and event != exit_token and (not q.empty() or count == 0): + try: + event = q.get(True, 1) + except queue.Empty as e: + event = None + if event and event != exit_token and starts_with_space(event['line']): + lastEvent['line'] += "\n" + event_line(event['line']) + else: + if lastEvent: + payload += json.dumps(to_event(lastEvent)) + count += 1 + lastEvent = None + if not event: + break + lastEvent = event + + if count > 0: + stomp.send(options.exchange, options.routing_key, payload) t = threading.Thread(target=sending) t.daemon = True t.start() try: - last_heartbeat = time.time(); - while 1: - rlist, _, _ = select([sys.stdin], [], [], 0.1) - if rlist: - line = sys.stdin.readline() - if not line: - q.put(exit_token) - break - line = line[0:-1] - if len(line) > 0: - q.put({'time': datetime.datetime.now().isoformat(), 'line': line}) - - now = time.time(); - if now - last_heartbeat >= options.heartbeat: - log_json('heartbeat') - last_heartbeat += options.heartbeat - - - t.join() -except KeyboardInterrupt, e: - pass + last_heartbeat = time.time() + while 1: + rlist, _, _ = select([sys.stdin], [], [], 0.1) + if rlist: + line = sys.stdin.readline() + if not line: + q.put(exit_token) + break + line = line[0:-1] + if len(line) > 0: + q.put({'time': datetime.datetime.now().isoformat(), 'line': line}) + + now = time.time() + if now - last_heartbeat >= options.heartbeat: + log_json('heartbeat') + last_heartbeat += options.heartbeat + + t.join() +except KeyboardInterrupt as e: + pass + diff --git a/sender b/sender index f1790f7..0c72822 100755 --- a/sender +++ b/sender @@ -7,27 +7,44 @@ import os import glob import socket import optparse -import md5 +import hashlib import zipfile import gzip import tempfile import shutil import subprocess import threading -import Queue import signal import traceback +import json DEFAULT_SIGNAGURE_LENGTH=256 -filter_stopped = Queue.Queue() -stop_signal = Queue.Queue() +try: + import queue # Python 3 import +except ImportError: + import Queue as queue # If queue missing, we're on Py2, import Py2 as Py3 name + +filter_stopped = queue.Queue() +stop_signal = queue.Queue() running_lock = threading.Lock() +debug_on = False +running = True def should_stop(): return not stop_signal.empty() +def heartbeat(): + global running + while running: + if running: + log("METRIC ns=forwarder.heartbeat result=ok"); + else: + log("METRIC ns=forwarder.heartbeat result=fail"); + time.sleep(3600) + def main(): + global running parser = optparse.OptionParser(usage="Usage: %prog [options] ... ", description= "Outputs the content of files resolved by the patterns passed as " "parameters and keep monitoring them for new content. " @@ -36,6 +53,7 @@ def main(): "IMPORTANT: only files with size >= signature-length (default %d) bytes will be processed. " "Zip files will be open recursively, and only once." % DEFAULT_SIGNAGURE_LENGTH) + parser.add_option('-D', '--debug', action="store_true", dest="debug", help="Enable debug output", default=False) parser.add_option('-f', '--follow', action="store_true", dest="follow", help="Pools the file very second for changes in an infinite loop.", default=False) parser.add_option('-p', '--offsets', action="store", dest="offsets", help="File to persist the last offsets read for each file. If it doesn't exist, the files are read from beginning.", default='/dev/null') parser.add_option('-t', '--tcp', action="store", dest="host", help="Sends the output to the host:port via TCP.", metavar="HOST:PORT") @@ -49,6 +67,11 @@ def main(): options, args = parser.parse_args() + if options.debug: + log("DEBUG ON") + global debug_on + debug_on = True + if options.dump_pid: f = open(options.dump_pid,'w') f.write("%d" % os.getpid()) @@ -68,7 +91,9 @@ def main(): a = options.host.split(":") try: output = Netcat(a[0], int(a[1]), options.retry_on_network_error) - except socket.error, e: + except socket.error as e: + global running + running=False log("Socket error: %s" % str(e)) sys.exit(1) @@ -86,7 +111,7 @@ def main(): try: last_output.write(line) last_output.flush() - except socket.error, e: + except socket.error as e: log("Socket error in filer: %s" % e) break except: @@ -115,11 +140,13 @@ def main(): log("That does it. Sending SIGINT.") os.kill(filter.pid, sig) break - except OSError, e: + except OSError as e: log("An exception happened while waiting for process to finish: %s" % e) def signal_handler(signal, frame): + global running + running = False log("%s caught" % signal_names[signal]) stop_signal.put(True) # if not filter.stdin.closed: @@ -153,6 +180,8 @@ def log(msg): sys.stderr.flush() def debug(msg): + if debug_on: + log("DEBUG: "+ msg); # sys.stderr.write(str(msg)) # sys.stderr.write("\n") # sys.stderr.flush() @@ -184,21 +213,25 @@ class Netcat(object): self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.s.connect(self.addr) self.show_error_message = True - break; - except socket.error, e: + break + except socket.error as e: self.error("Socket error: %s" % e) - except IOError, e: + except IOError as e: self.error("IOError: %s" % e) self.exit_or_retry() def error(self, msg): + global running + running=False if self.show_error_message: log(msg) def exit_or_retry(self): + global running + running = True if not self.retry_on_network_error: + running = False sys.exit(1) - self.error("Retrying every 3 seconds.") self.show_error_message = False time.sleep(3) @@ -213,9 +246,9 @@ class Netcat(object): self.last_msg = msg self.send_last = False break - except socket.error, e: + except socket.error as e: self.error("Socket error: %s" % e) - except IOError, e: + except IOError as e: self.error("IOError: %s" % e) self.send_last = True self.exit_or_retry() @@ -264,6 +297,8 @@ class Tail(object): self.starting = options.starting self.start_from_tail = options.start_from_tail self.readOffsets() + self.duplicates = {} + def persistOffsets(self): if self.offsetsfn == '/dev/null': @@ -272,7 +307,7 @@ class Tail(object): temp = '%s-temp' % self.offsetsfn f = open(temp,'w') try: - for sig, info in self.offsets.iteritems(): + for sig, info in self.offsets.items(): f.write(info.dump()) f.write('\n') f.flush() @@ -299,13 +334,13 @@ class Tail(object): f.close() def generateSignature(self, f): - offset = f.tell(); + offset = f.tell() header = f.read(self.signatureLength) f.seek(offset) if len(header) == 0: return None else: - return md5.new(header).hexdigest() + return hashlib.md5(header).hexdigest().encode('utf-8') def purgeOffsetsNotIn(self, existing): newOffsets = {} @@ -319,10 +354,11 @@ class Tail(object): if should_stop(): return for line in f: + if debug_on: debug("line %s" % line.replace("\n","")) if not self.starting or line >= self.starting: self.output.write(line) if should_stop(): - break; + break self.output.flush() def isCompressed(self, fn): @@ -355,10 +391,11 @@ class Tail(object): self.processZipFile(f, fn) def processGzipFile(self, fn): - debug("gz: %s" % fn) + if debug_on: debug("gz: %s" % fn) f = gzip.open(fn) try: - self.processFile(f, '/var/tmp/fake.log', {}) + sig = self.generateSignature(f) + self.processFile(f, '/var/tmp/fake.log', sig) finally: f.close() @@ -369,46 +406,41 @@ class Tail(object): shutil.rmtree(path) def processFileByName(self, fn, existing): - debug("processFileByName: %s" % fn) + if debug_on: debug("processFileByName: %s" % fn) f = open(fn, 'rb') try: self.processFile(f, fn, existing) finally: f.close() + debug("processFileByName, close file"); - def processFile(self, f, fn, existing): - debug("processFile: %s" % fn) - sig = self.generateSignature(f) - if not sig: - return - - if sig in existing and os.path.getsize(fn) != os.path.getsize(existing[sig]): - log("WARN Files '%s' and '%s' have same signature and different sizes" % (fn, existing[sig])) - + def processFile(self, f, fn, sig): info = self.offsets.get(sig, Info(sig=sig, name=fn)) + if debug_on: debug("processFile %s %s" % (fn, info.dump())) lastOffset = info.offset - info.name = fn if self.isCompressed(fn): - debug("compressed: %s" % fn) + if debug_on: debug("compressed: %s" % fn) if info.offset == 0: if not self.start_from_tail: self.processCompressedFile(f, fn) info.offset = -1 else: if self.start_from_tail: + if debug_on: debug("starting from tail %s" % fn) info.offsets = os.path.getsize(fn) if os.path.exists(fn) and os.path.getsize(fn) < info.offset: - log("WARN file %s was truncated" % fn) + log("METRIC ns=forwarder.truncated file=%s filesize=%s offset=%s" % (fn, os.path.getsize(fn), info.offset)) info.offset = os.path.getsize(fn) else: + if debug_on: debug("Seeking to %s in %s, currently at %s" % (info.offset, fn, f.tell())) f.seek(info.offset) self.copy(f) info.offset = f.tell() + if debug_on: debug("Setting offset for: %s to %s (info: %s)" % (fn, info.offset, info.dump())) - existing[sig] = fn if lastOffset != info.offset: self.offsets[sig] = info @@ -416,29 +448,71 @@ class Tail(object): def run(self): + global running running_lock.acquire(True) try: while not should_stop(): if not filter_stopped.empty(): sys.exit(1) existing = {} + to_process = {} + filehandles = {} for fnpattern in self.fnpatterns: for fn in glob.glob(fnpattern): + if debug_on: debug("Checking fn %s" % fn) + f = None try: if not os.path.isfile(fn): log("File no longer exists: %s" % fn) continue if os.path.getsize(fn) < self.signatureLength and not self.isCompressed(fn): + log("Skipping as file too short to generate sig or file is compressed: %s" % fn) continue - self.processFileByName(fn, existing) - except Exception, e: - log("Exception: %s" % e) + f = open(fn, 'rb') + sig = self.generateSignature(f) + if debug_on: debug("Sig for fn %s is %s" % (fn, sig)) + sig_fn = sig+fn.encode('utf-8'); + if sig in existing: + if not sig_fn in self.duplicates: + log("METRIC ns=forwarder.duplicatesig file=%s dupe_of=%s sig=%s" % (fn, existing[sig], sig)) + self.duplicates[sig_fn] = True; + if sig in to_process: #take original duplicate out of to_process + del to_process[sig] + f.close() + else: + if debug_on: debug("Adding file %s %s" % (fn, sig)) + existing[sig] = fn #leave in existing in case more than 2 duplicates + to_process[sig] = fn + filehandles[sig] = f + + except Exception as e: + log("METRIC ns=forwarder.error.preprocess file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", ""))) + log("Exception=\"%s\"" % e) + if f: f.close() exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_tb(exc_traceback) + if debug_on: debug("To Process %s" % to_process) + sigs = to_process.keys() + for sig in sigs: + f = None + try: + fn = to_process[sig] + if debug_on: debug("Processing file %s %s" % (fn, sig)) + f = filehandles[sig] + self.processFile(f, fn, sig) + except Exception as e: + log("METRIC ns=forwarder.error.process file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", ""))) + log("Exception=\"%s\"" % e) + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_tb(exc_traceback) + f.close() + finally: + if f and not f.closed: f.close() + if len(existing) != len(self.offsets): - self.purgeOffsetsNotIn(existing) + self.purgeOffsetsNotIn(to_process) if not self.follow: break time.sleep(1) @@ -448,6 +522,8 @@ class Tail(object): if __name__ == '__main__': - main() - + heartbeat_thread = threading.Thread(target=heartbeat) + heartbeat_thread.daemon=True + heartbeat_thread.start() + main() #import pdb ; pdb.set_trace() diff --git a/tests/assert.sh b/tests/assert.sh index ffd2b95..3396bf7 100644 --- a/tests/assert.sh +++ b/tests/assert.sh @@ -102,7 +102,8 @@ assert() { # assert [stdin] (( tests_ran++ )) || : [[ -z "$DISCOVERONLY" ]] || return - expected=$(echo -ne "${2:-}") + # expected=$(echo -ne "${2:-}") + expected=${2:-} result="$(eval 2>/dev/null $1 <<< ${3:-})" || true if [[ "$result" == "$expected" ]]; then [[ -z "$DEBUG" ]] || echo -n . diff --git a/tests/test-kill-int-forwarder.sh b/tests/test-kill-int-forwarder.sh index 87ae499..cb8e314 100755 --- a/tests/test-kill-int-forwarder.sh +++ b/tests/test-kill-int-forwarder.sh @@ -5,11 +5,13 @@ uuid=$(last_uuid) sender -f 'myapp.*' -l "grep --line-buffered -v ${uuid}" > /dev/null & sender_pid=$! -sleep 0.1 # to avoid race condition +sleep 1 # to avoid race condition assert "ps ax | grep ${uuid} | grep -v sender | grep -v filter_wrapper | grep line-buffered | count_lines" 1 -kill -SIGINT $sender_pid +if ps -p $sender_pid > /dev/null; then + kill -2 $sender_pid +fi wait $sender_pid diff --git a/tests/test-kill-term-forwarder.sh b/tests/test-kill-term-forwarder.sh index 1b6b512..7980334 100755 --- a/tests/test-kill-term-forwarder.sh +++ b/tests/test-kill-term-forwarder.sh @@ -5,11 +5,13 @@ uuid=$(last_uuid) sender -f 'myapp.*' -l "grep --line-buffered -v ${uuid}" > /dev/null & sender_pid=$! -sleep 0.1 # to avoid race condition +sleep 1 # to avoid race condition assert "ps ax | grep ${uuid} | grep -v sender | grep -v filter_wrapper | grep line-buffered | count_lines" 1 -kill -SIGTERM $sender_pid +if ps -p $sender_pid > /dev/null; then + kill -15 $sender_pid +fi wait $sender_pid diff --git a/tests/test-opt-greater-or-equal-than.sh b/tests/test-opt-greater-or-equal-than.sh index 0ed4eaa..bc7d21d 100644 --- a/tests/test-opt-greater-or-equal-than.sh +++ b/tests/test-opt-greater-or-equal-than.sh @@ -1,5 +1,7 @@ for i in {1..10}; do log_random >> myapp.log; done +sleep 1 # to avoid race condition + log_random >> myapp.log ts=$(last_timestamp) log_random >> myapp.log diff --git a/tests/test-signature-size.sh b/tests/test-signature-size.sh index 97dce52..17df4f0 100644 --- a/tests/test-signature-size.sh +++ b/tests/test-signature-size.sh @@ -1,5 +1,14 @@ echo "AAAAAAAAAAAAAAAAAA BBBBBBBBBBBBBBB" > myapp.log echo "AAAAAAAAAAAAAAAAAA CCCCCCCCCCCCCCC" > myapp.log.1 -assert "sender -s 15 'myapp.*' | count_lines" 1 +assert "sender -s 15 'myapp.*' | count_lines" 0 assert "sender -s 30 'myapp.*' | count_lines" 2 + +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.2 +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.3 +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.4 +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.5 + +assert "sender -s 30 'myapp.*' | count_lines" 2 + +assert "sender -s 30 'myapp.*' 2>&1 | grep forwarder.duplicatesig | count_lines" 3