Mercurial > logstash
changeset 3:796ac0b50dbf
add cron.daily index cleaning
author | Carl Byington <carl@five-ten-sg.com> |
---|---|
date | Thu, 07 Mar 2013 10:41:01 -0800 |
parents | 9e0cdf091b8a |
children | 29ffaf4e0a7f |
files | logstash.conf logstash.cron logstash.rc logstash.spec logstash_index_cleaner.py |
diffstat | 5 files changed, 226 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/logstash.conf Fri Mar 01 18:54:53 2013 -0800 +++ b/logstash.conf Thu Mar 07 10:41:01 2013 -0800 @@ -32,6 +32,15 @@ pattern => "%{SENDMAIL}" patterns_dir => "/var/lib/logstash/data/patterns" } + grep { + type => "sendmail" + match => [ "program", "sendmail" ] + } + grep { + type => "sendmail" + negate => true + match => [ "message", "^(M|m)ilter" ] + } grok { type => "linux-syslog" @@ -40,7 +49,7 @@ date { # do we need this? the above picks up SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME} type => "linux-syslog" - timestamp => ["MMM dd HH:mm:ss","MMM d HH:mm:ss"] + timestamp => ["MMM dd HH:mm:ss","MMM d HH:mm:ss"] } grok { type => "apache-access"
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/logstash.cron Thu Mar 07 10:41:01 2013 -0800 @@ -0,0 +1,4 @@ +#!/bin/bash + +/usr/local/bin/logstash_index_cleaner.py -d 2 +
--- a/logstash.rc Fri Mar 01 18:54:53 2013 -0800 +++ b/logstash.rc Thu Mar 07 10:41:01 2013 -0800 @@ -29,7 +29,7 @@ CONFIGFILE=/etc/logstash/logstash.conf LOGFILE=/var/log/logstash/logstash.log JARNAME=/usr/local/bin/logstash.jar -ARGS="-jar ${JARNAME} agent -vvv --config ${CONFIGFILE} --log ${LOGFILE} -- web --backend elasticsearch://127.0.0.1/?local" +ARGS="-jar ${JARNAME} agent --config ${CONFIGFILE} --log ${LOGFILE} -- web --backend elasticsearch://127.0.0.1/?local" SCRIPTNAME=/etc/rc.d/init.d/logstash PIDFILE=/var/run/logstash.pid base=logstash @@ -50,6 +50,11 @@ pid=$(su logstash -c 'echo -e "'"$JAVA $ARGS"' </dev/null >'"$LOGFILE"' 2>&1 & \n echo \$!" | bash') echo $pid >$PIDFILE [ -n "$pid" ] && success $"$base startup" || failure $"$base startup" + # might try + #id + #exec sudo -u transmission /bin/sh - << eof + #id + #eof } @@ -84,6 +89,7 @@ [ -n "$pid" ] && pkill -HUP -u logstash -P $pid ;; status) + echo -n "$base " status -p $PIDFILE ;; *)
--- a/logstash.spec Fri Mar 01 18:54:53 2013 -0800 +++ b/logstash.spec Thu Mar 07 10:41:01 2013 -0800 @@ -3,21 +3,23 @@ %define _bindir /usr/local/bin -Summary: A tool for managing your logs +Summary: logstash is a tool for managing events and logs. Name: logstash Version: 1.1.9 Release: 0 -License: new BSD -Group: Applications/Productivity +License: Apache 2.0 +Group: System Environment/Daemons URL: http://logstash.net/ BuildArch: noarch Source0: https://logstash.objects.dreamhost.com/release/%{name}-%{version}-monolithic.jar #ource0: http://logstash.objects.dreamhost.com/release/%{name}-%{version}-flatjar.jar -Source1: logstash.rc +Source1: %{name}.rc Source2: %{name}.conf Source3: apache.pattern Source4: sendmail.pattern -Requires: httpd java-1.7.0-openjdk +Source5: https://logstash.jira.com/secure/attachment/12610/logstash_index_cleaner.py +Source6: %{name}.cron +Requires: httpd java-1.7.0-openjdk python-pip python-argparse python-ordereddict Requires(pre): /usr/sbin/useradd Requires(pre): /usr/bin/getent Requires(postun): /usr/sbin/userdel @@ -27,33 +29,35 @@ %description -logstash tool for managing your logs +logstash is a tool for managing events and logs. You can use it to +collect logs, parse them, and store them for later use (like, for +searching). Speaking of searching, logstash comes with a web interface +for searching and drilling into all of your logs. %prep -cp -p %SOURCE0 . -cp -p %SOURCE1 . -cp -p %SOURCE2 . -cp -p %SOURCE3 . -cp -p %SOURCE4 . +true %build +true %install rm -rf $RPM_BUILD_ROOT mkdir -p $RPM_BUILD_ROOT/var/log/%{name} -install -D -m 640 apache.pattern $RPM_BUILD_ROOT/var/lib/%{name}/data/patterns/apache -install -D -m 640 sendmail.pattern $RPM_BUILD_ROOT/var/lib/%{name}/data/patterns/sendmail -install -D -m 755 %{name}.rc $RPM_BUILD_ROOT/etc/rc.d/init.d/%{name} -install -D -m 750 %{name}-%{version}-monolithic.jar $RPM_BUILD_ROOT/%{_bindir}/%{name}.jar -#nstall -D -m 750 %{name}-%{version}-flatjar.jar $RPM_BUILD_ROOT/%{_bindir}/%{name}.jar -install -D -m 640 %{name}.conf $RPM_BUILD_ROOT/etc/%{name}/%{name}.conf +install -D -m 750 %SOURCE0 $RPM_BUILD_ROOT/%{_bindir}/%{name}.jar +install -D -m 750 %SOURCE5 $RPM_BUILD_ROOT/%{_bindir}/logstash_index_cleaner.py +install -D -m 755 %SOURCE1 $RPM_BUILD_ROOT/etc/rc.d/init.d/%{name} +install -D -m 755 %SOURCE6 $RPM_BUILD_ROOT/etc/cron.daily/%{name} +install -D -m 640 %SOURCE2 $RPM_BUILD_ROOT/etc/%{name}/%{name}.conf +install -D -m 640 %SOURCE3 $RPM_BUILD_ROOT/var/lib/%{name}/data/patterns/apache +install -D -m 640 %SOURCE4 $RPM_BUILD_ROOT/var/lib/%{name}/data/patterns/sendmail %pre /usr/bin/getent passwd %{name} >/dev/null || /usr/sbin/useradd -r -d /var/lib/%{name} -M -c "%{name} pseudo-user" %{name} >/dev/null +pip-python install pyes || : %post @@ -85,6 +89,7 @@ %config(noreplace) %attr(0750,%{name},root) /etc/%{name} %config(noreplace) %attr(0640,%{name},root) /etc/%{name}/%{name}.conf /etc/rc.d/init.d/%{name} +/etc/cron.daily/%{name} %dir %attr(0750,%{name},root) /var/log/%{name} %dir %attr(0750,%{name},root) /var/lib/%{name} %dir %attr(0750,%{name},root) /var/lib/%{name}/data
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/logstash_index_cleaner.py Thu Mar 07 10:41:01 2013 -0800 @@ -0,0 +1,183 @@ +#!/usr/bin/env python +# +# Deletes all indices with a datestamp older than "days-to-keep" for daily +# if you have hourly indices, it will delete all of those older than "hours-to-keep" +# +# This script presumes an index is named typically, e.g. logstash-YYYY.MM.DD +# It will work with any name-YYYY.MM.DD or name-YYYY.MM.DD.HH type sequence +# +# Requires python and the following dependencies (all pip/easy_installable): +# +# pyes (python elasticsearch bindings, which might need simplejson) +# argparse (built-in in python2.7 and higher, python 2.6 and lower will have to easy_install it) +# +# TODO: Proper logging instead of just print statements, being able to configure a decent logging level. +# Unit tests. The code is somewhat broken up into logical parts that may be tested separately. +# Better error reporting? +# Improve the get_index_epoch method to parse more date formats. Consider renaming (to "parse_date_to_timestamp"?) + +import sys +import time +import argparse +from datetime import timedelta + +import pyes + + +__version__ = '0.1.2' + + +def make_parser(): + """ Creates an ArgumentParser to parse the command line options. """ + parser = argparse.ArgumentParser(description='Delete old logstash indices from Elasticsearch.') + + parser.add_argument('-v', '--version', action='version', version='%(prog)s '+__version__) + + parser.add_argument('--host', help='Elasticsearch host.', default='localhost') + parser.add_argument('--port', help='Elasticsearch port', default=9200, type=int) + parser.add_argument('-t', '--timeout', help='Elasticsearch timeout', default=30, type=int) + + parser.add_argument('-p', '--prefix', help='Prefix for the indices. Indices that do not have this prefix are skipped.', default='logstash-') + parser.add_argument('-s', '--separator', help='Time unit separator', default='.') + + parser.add_argument('-H', '--hours-to-keep', action='store', help='Number of hours to keep.', type=int) + parser.add_argument('-d', '--days-to-keep', action='store', help='Number of days to keep.', type=int) + parser.add_argument('-g', '--disk-space-to-keep', action='store', help='Disk space to keep (GB).', type=float) + + parser.add_argument('-n', '--dry-run', action='store_true', help='If true, does not perform any changes to the Elasticsearch indices.', default=False) + + return parser + + +def get_index_epoch(index_timestamp, separator='.'): + """ Gets the epoch of the index. + + :param index_timestamp: A string on the format YYYY.MM.DD[.HH] + :return The creation time (epoch) of the index. + """ + year_month_day_optionalhour = index_timestamp.split(separator) + if len(year_month_day_optionalhour) == 3: + year_month_day_optionalhour.append('3') + + return time.mktime([int(part) for part in year_month_day_optionalhour] + [0,0,0,0,0]) + + +def find_expired_indices(connection, days_to_keep=None, hours_to_keep=None, separator='.', prefix='logstash-', out=sys.stdout, err=sys.stderr): + """ Generator that yields expired indices. + + :return: Yields tuples on the format ``(index_name, expired_by)`` where index_name + is the name of the expired index and expired_by is the number of seconds (a float value) that the + index was expired by. + """ + utc_now_time = time.time() + time.altzone + days_cutoff = utc_now_time - days_to_keep * 24 * 60 * 60 if days_to_keep is not None else None + hours_cutoff = utc_now_time - hours_to_keep * 60 * 60 if hours_to_keep is not None else None + + for index_name in sorted(set(connection.get_indices().keys())): + if not index_name.startswith(prefix): + print >> out, 'Skipping index due to missing prefix {0}: {1}'.format(prefix, index_name) + continue + + unprefixed_index_name = index_name[len(prefix):] + + # find the timestamp parts (i.e ['2011', '01', '05'] from '2011.01.05') using the configured separator + parts = unprefixed_index_name.split(separator) + + # perform some basic validation + if len(parts) < 3 or len(parts) > 4 or not all([item.isdigit() for item in parts]): + print >> err, 'Could not find a valid timestamp from the index: {0}'.format(index_name) + continue + + # find the cutoff. if we have more than 3 parts in the timestamp, the timestamp includes the hours and we + # should compare it to the hours_cutoff, otherwise, we should use the days_cutoff + cutoff = hours_cutoff + if len(parts) == 3: + cutoff = days_cutoff + + # but the cutoff might be none, if the current index only has three parts (year.month.day) and we're only + # removing hourly indices: + if cutoff is None: + print >> out, 'Skipping {0} because it is of a type (hourly or daily) that I\'m not asked to delete.'.format(index_name) + continue + + index_epoch = get_index_epoch(unprefixed_index_name) + + # if the index is older than the cutoff + if index_epoch < cutoff: + yield index_name, cutoff-index_epoch + + else: + print >> out, '{0} is {1} above the cutoff.'.format(index_name, timedelta(seconds=index_epoch-cutoff)) + +def find_overusage_indices(connection, disk_space_to_keep, separator='.', prefix='logstash-', out=sys.stdout, err=sys.stderr): + """ Generator that yields over usage indices. + + :return: Yields tuples on the format ``(index_name, 0)`` where index_name + is the name of the expired index. The second element is only here for + compatiblity reasons. + """ + + disk_usage = 0.0 + disk_limit = disk_space_to_keep * 2**30 + + for index_name in reversed(sorted(set(connection.get_indices().keys()))): + + if not index_name.startswith(prefix): + print >> out, 'Skipping index due to missing prefix {0}: {1}'.format(prefix, index_name) + continue + + index_size = connection.status(index_name).get('indices').get(index_name).get('index').get('primary_size_in_bytes') + disk_usage += index_size + + if disk_usage > disk_limit: + yield index_name, 0 + else: + print >> out, 'keeping {0}, disk usage is {1:.3f} GB and disk limit is {2:.3f} GB.'.format(index_name, disk_usage/2**30, disk_limit/2**30) + +def main(): + start = time.time() + + parser = make_parser() + arguments = parser.parse_args() + + if not arguments.hours_to_keep and not arguments.days_to_keep and not arguments.disk_space_to_keep: + print >> sys.stderr, 'Invalid arguments: You must specify either the number of hours, the number of days to keep or the maximum disk space to use' + parser.print_help() + return + + connection = pyes.ES('{0}:{1}'.format(arguments.host, arguments.port), timeout=arguments.timeout) + + if arguments.days_to_keep: + print 'Deleting daily indices older than {0} days.'.format(arguments.days_to_keep) + expired_indices = find_expired_indices(connection, arguments.days_to_keep, arguments.hours_to_keep, arguments.separator, arguments.prefix) + if arguments.hours_to_keep: + print 'Deleting hourly indices older than {0} hours.'.format(arguments.hours_to_keep) + expired_indices = find_expired_indices(connection, arguments.days_to_keep, arguments.hours_to_keep, arguments.separator, arguments.prefix) + if arguments.disk_space_to_keep: + print 'Let\'s keep disk usage lower than {} GB.'.format(arguments.disk_space_to_keep) + expired_indices = find_overusage_indices(connection, arguments.disk_space_to_keep, arguments.separator, arguments.prefix) + + print '' + + for index_name, expired_by in expired_indices: + expiration = timedelta(seconds=expired_by) + + if arguments.dry_run: + print 'Would have attempted deleting index {0} because it is {1} older than the calculated cutoff.'.format(index_name, expiration) + continue + + print 'Deleting index {0} because it was {1} older than cutoff.'.format(index_name, expiration) + + deletion = connection.delete_index_if_exists(index_name) + # ES returns a dict on the format {u'acknowledged': True, u'ok': True} on success. + if deletion.get('ok'): + print 'Successfully deleted index: {0}'.format(index_name) + else: + print 'Error deleting index: {0}. ({1})'.format(index_name, deletion) + + print '' + print 'Done in {0}.'.format(timedelta(seconds=time.time()-start)) + + +if __name__ == '__main__': + main()