#!/usr/bin/env python ''' Monitor ActiveMQ server via its http web interface ''' from HTMLParser import HTMLParser from optparse import OptionParser import xml.etree.ElementTree as ET import json import urllib2 import urllib EXIT_CODE = { 'OK': 0, 'WARN': 1, 'CRIT': 2, 'UNKNOWN': 3, } def prepareOpts(): ''' Parse options from the shell ''' cmds = { 'queues' : 'notify if there is a least one queue with no consumers (--exlcude)', 'consumer': 'notify if the specific consumers does not consume the queues (--queues, --client)', } def err( string ): print 'Error: {0}'.format( string ) print __doc__ parser.print_help() print '\nTypes:' for k in cmds: print ' {0}: {1}'.format(k ,cmds[k]) exit(1) parser = OptionParser() parser.add_option('-H', '--server', dest='server', type='string', help='ActiveMQ fqdn or ip', default='localhost') parser.add_option('-U', '--user', dest='user', type='string', help='http username', default=None) parser.add_option('-P', '--password', dest='password', type='string', help='http password', default=None) parser.add_option('-p', '--port', dest='port', type='int', help='ActiveMQ web interface port', default=8161) parser.add_option('-t', '--timeout', dest='timeout', type='float', help='how many seconds to wait for each http request', default=5) parser.add_option('-T', '--type', dest='type', type='choice', choices=cmds.keys(), help='what to check: {0}'.format(cmds.keys()) ) parser.add_option('-e', '--exclude', dest='exclude', type='string', help='csv list of queues to exclude (implies -T queues)', default=None ) parser.add_option('-q', '--queues', dest='queues', type='string', help='csv list of queues (implies -T consumer)', default=None) parser.add_option('-c', '--client', dest='client', type='string', help='the client prefix to search (implies -T consumer)', default=None ) (opts, args) = parser.parse_args() kargs = {} if opts.user is None and opts.password is not None: err('missing -P') elif opts.password is None and opts.user is not None: err('missing -U') if not opts.type: err('missing -T') elif opts.type == 'consumer': if opts.client is None: err('missing -c') if opts.queues is None: err('missing -q') else: kargs.update({'queues': opts.queues.split(',')}) kargs.update({'client': opts.client}) elif opts.exclude is not None: kargs.update({'exclude': opts.exclude.split(',')}) return (opts, kargs) class AmqException(Exception): def __init__(self, msg, xcode): self.msg = msg self.xcode = xcode def __str__(self): return self.msg def getXcode(self): return self.xcode def getMsg(self): return self.msg class ConsumerHTMLParser(HTMLParser): ''' Parse the consumers id from http://url/admin/queueConsumers.jsp?JMSDestination=QUEUENAME ''' consumers = [] table = False body = False tr = False td = False a = False def reset_vars(self): self.consumers = [] self.table = False self.body = False self.tr = False self.td = False self.a = False def handle_starttag(self, tag, attrs): if self.td and tag == 'a': self.a = True elif self.tr and tag == 'td': self.td = True elif self.body and tag == 'tr': self.tr = True elif self.table and tag == 'tbody': self.body = True elif tag == 'table': self.table = ('id', 'messages') in attrs def handle_data(self, data): if self.a: if not data in self.consumers: self.consumers.append( data ) self.a = False self.td = False self.tr = False def get_consumers(self): return self.consumers class ActivemqMonitor(): ''' Monitor ActiveMQ via http web interface ''' def __init__(self, server, port, timeout, user=None, password=None, realm='ActiveMQRealm'): self.url = 'http://{0}:{1}'.format(server, port) self.server = server self.port = port self.timeout = timeout self.user = user self.password = password self.realm = realm if user is not None and password is not None: urllib2.install_opener( self._auth( self.url, self.user, self.password, self.realm ) ) def _auth(self, uri, user, password, realm): ''' returns a authentication handler. ''' basic = urllib2.HTTPBasicAuthHandler() basic.add_password( realm=realm, uri=uri, user=user, passwd=password ) digest = urllib2.HTTPDigestAuthHandler() digest.add_password( realm=realm, uri=uri, user=user, passwd=password ) return urllib2.build_opener(basic, digest) def _wget(self, url): ''' create the http request to AMQ web UI ''' try: ret = urllib2.urlopen(url, timeout=self.timeout).read() except urllib2.URLError: raise AmqException( 'UNKNOWN: Could not create http request to ActiveMQ', EXIT_CODE['UNKNOWN'] ) return ret def _getQueueConsumers(self, queue, parser): ''' Get the parsed data of the queue ''' url = '{0}/admin/queueConsumers.jsp?{1}'.format( self.url, urllib.urlencode( { 'JMSDestination': queue } ), ) parser.reset_vars() parser.feed( self._wget(url) ) return parser.get_consumers() def _eval_queues(self, res, opts): if res: return { 'msg': 'CRIT: the following queues are not consumed {0}'.format(','.join(res)), 'exit': EXIT_CODE['CRIT'] } else: return { 'msg': 'OK: All the queues are consumed', 'exit': EXIT_CODE['OK'] } def _eval_consumer(self, res, opts): if res: return { 'msg': 'CRIT: the following queues are not consumed by {0} {1}'.format( opts.client, ','.join(res) ), 'exit': EXIT_CODE['CRIT'] } else: return { 'msg': 'OK: {0} is consuming all of its queues'.format(opts.client), 'exit': EXIT_CODE['OK'] } def queues(self, exclude=[]): ''' return all the queues with zero consumers ''' errors = [] url = '{0}/admin/xml/queues.jsp'.format(self.url, ) html = self._wget(url) for q in ET.fromstring( html ).findall('queue'): if q.get('name') not in exclude and int(q.find('stats').get('consumerCount')) <= 0: errors.append(q.get('name')) return errors def consumer(self, client, queues): ''' check if the clientid is configured as a subscriber on the queue ''' missing = list(queues) parser = ConsumerHTMLParser() for queue in queues: for consumer in self._getQueueConsumers(queue, parser): if client in consumer: missing.remove(queue) break return missing @staticmethod def main(): ''' Main function ''' (opts, kargs) = prepareOpts() amq = ActivemqMonitor( opts.server, opts.port, opts.timeout, opts.user, opts.password ) method = getattr(amq, opts.type) try: res = method(**kargs) except AmqException as e: print e.getMsg() exit(e.getXcode()) eval_method = getattr( amq, '_eval_{0}'.format(opts.type) ) ret = eval_method(res, opts) print ret['msg'] exit(ret['exit']) if __name__ == '__main__': ActivemqMonitor.main()