Merge pull request #57 from drewkerrigan/v2.0

Release V2.0
This commit is contained in:
Markus Opolka 2020-03-31 18:15:06 +02:00 committed by GitHub
commit 219e99386c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 620 additions and 268 deletions

27
.github/workflows/unittest.yml vendored Normal file
View File

@ -0,0 +1,27 @@
name: CI
on: [push, pull_request]
jobs:
gitHubActionForPytest:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.6, 3.7, 3.8]
name: GitHub Action
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Lint
run: |
pip3 install --upgrade pip wheel setuptools
pip3 install pylint
python3 -m pylint check_http_json.py
- name: Unit Test
run: |
python3 -m unittest discover
- name: Coverage
run: |
pip3 install coverage
python3 -m coverage run -m unittest discover
python3 -m coverage report -m --include check_http_json.py

64
.gitignore vendored Normal file
View File

@ -0,0 +1,64 @@
#Emacs
\#*
.\#*
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
.venv/
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
#Ipython Notebook
.ipynb_checkpoints

5
.pylintrc Normal file
View File

@ -0,0 +1,5 @@
# pylint config
[MESSAGES CONTROL]
disable=line-too-long, redefined-outer-name, too-many-arguments, too-many-instance-attributes, fixme, invalid-name, superfluous-parens, missing-function-docstring, missing-module-docstring, multiple-imports, no-else-return, too-many-return-statements
[MASTER]
ignore-patterns=^test.*

View File

@ -1,3 +1,5 @@
![CI](https://github.com/drewkerrigan/nagios-http-json/workflows/CI/badge.svg)
# Nagios Json Plugin
This is a generic plugin for Nagios which checks json values from a given HTTP endpoint against argument specified rules and determines the status and performance data for that service.
@ -36,7 +38,7 @@ Generic Nagios plugin which checks json values from a given endpoint against
argument specified rules and determines the status and performance data for
that service.
Version: 1.4.0 (2019-05-09)
Version: 2.0.0 (2020-03-22)
optional arguments:
-h, --help show this help message and exit
@ -206,7 +208,7 @@ More info about Nagios Range format and Units of Measure can be found at [https:
### Requirements
* Python 2.7
* Python 3
### Configuration

View File

@ -1,4 +1,14 @@
#!/usr/bin/python2.7
#!/usr/bin/env python3
import urllib.request, urllib.error, urllib.parse
import base64
import json
import argparse
import sys
import ssl
from pprint import pprint
from urllib.error import HTTPError
from urllib.error import URLError
plugin_description = \
"""
@ -9,26 +19,19 @@ argument specified rules and determines the status and performance data for
that service.
"""
import urllib2
import base64
import json
import argparse
import sys
import ssl
from pprint import pprint
from urllib2 import HTTPError
from urllib2 import URLError
OK_CODE = 0
WARNING_CODE = 1
CRITICAL_CODE = 2
UNKNOWN_CODE = 3
__version__ = '1.4.0'
__version_date__ = '2019-05-09'
__version__ = '2.0.0'
__version_date__ = '2020-03-22'
class NagiosHelper:
"""Help with Nagios specific status string formatting."""
"""
Help with Nagios specific status string formatting.
"""
message_prefixes = {OK_CODE: 'OK',
WARNING_CODE: 'WARNING',
CRITICAL_CODE: 'CRITICAL',
@ -38,17 +41,23 @@ class NagiosHelper:
critical_message = ''
unknown_message = ''
def getMessage(self):
"""Build a status-prefixed message with optional performance data
generated externally"""
text = "%s: Status %s." % (self.message_prefixes[self.getCode()],
self.message_prefixes[self.getCode()])
text += self.warning_message
text += self.critical_message
text += self.unknown_message
def getMessage(self, message=''):
"""
Build a status-prefixed message with optional performance data
generated externally
"""
message += self.warning_message
message += self.critical_message
message += self.unknown_message
code = self.message_prefixes[self.getCode()]
output = "{code}: Status {code}. {message}".format(code=code, message=message.strip())
if self.performance_data:
text += "|%s" % self.performance_data
return text
output = "{code}: {perf_data} Status {code}. {message}|{perf_data}".format(
code=code,
message=message.strip(),
perf_data=self.performance_data)
return output.strip()
def getCode(self):
code = OK_CODE
@ -69,19 +78,23 @@ class NagiosHelper:
def append_unknown(self, unknown_message):
self.unknown_message += unknown_message
def append_metrics(self, (performance_data,
warning_message, critical_message)):
def append_metrics(self, metrics):
(performance_data, warning_message, critical_message) = metrics
self.performance_data += performance_data
self.append_warning(warning_message)
self.append_critical(critical_message)
class JsonHelper:
"""Perform simple comparison operations against values in a given
JSON dict"""
def __init__(self, json_data, separator):
"""
Perform simple comparison operations against values in a given
JSON dict
"""
def __init__(self, json_data, separator, value_separator):
self.data = json_data
self.separator = separator
self.value_separator = value_separator
self.arrayOpener = '('
self.arrayCloser = ')'
@ -91,14 +104,14 @@ class JsonHelper:
remainingKey = key[separatorIndex + 1:]
if partialKey in data:
return self.get(remainingKey, data[partialKey])
else:
return (None, 'not_found')
return (None, 'not_found')
def getSubArrayElement(self, key, data):
subElemKey = key[:key.find(self.arrayOpener)]
index = int(key[key.find(self.arrayOpener) +
1:key.find(self.arrayCloser)])
remainingKey = key[key.find(self.arrayCloser + self.separator) + 2:]
if key.find(self.arrayCloser + self.separator) == -1:
remainingKey = key[key.find(self.arrayCloser) + 1:]
if subElemKey in data:
@ -106,6 +119,8 @@ class JsonHelper:
return self.get(remainingKey, data[subElemKey][index])
else:
return (None, 'not_found')
if index >= len(data):
return (None, 'not_found')
else:
if not subElemKey:
return self.get(remainingKey, data[index])
@ -114,7 +129,7 @@ class JsonHelper:
def equals(self, key, value):
return self.exists(key) and \
str(self.get(key)) in value.split(':')
str(self.get(key)) in value.split(self.value_separator)
def lte(self, key, value):
return self.exists(key) and float(self.get(key)) <= float(value)
@ -132,8 +147,11 @@ class JsonHelper:
return (self.get(key) != (None, 'not_found'))
def get(self, key, temp_data=''):
"""Can navigate nested json keys with a dot format
(Element.Key.NestedKey). Returns (None, 'not_found') if not found"""
"""
Can navigate nested json keys with a dot format
(Element.Key.NestedKey). Returns (None, 'not_found') if not found
"""
if temp_data:
data = temp_data
else:
@ -153,7 +171,7 @@ class JsonHelper:
if key.find(self.arrayOpener) != -1:
return self.getSubArrayElement(key, data)
else:
if key in data:
if isinstance(data, dict) and key in data:
return data[key]
else:
return (None, 'not_found')
@ -167,10 +185,10 @@ class JsonHelper:
subElemKey = key[:key.find('(*)')-1]
remainingKey = key[key.find('(*)')+3:]
elemData = self.get(subElemKey)
if elemData is (None, 'not_found'):
if elemData == (None, 'not_found'):
keys.append(key)
return keys
if subElemKey is not '':
if subElemKey != '':
subElemKey = subElemKey + '.'
for i in range(len(elemData)):
newKey = subElemKey + '(' + str(i) + ')' + remainingKey
@ -192,17 +210,24 @@ def _getKeyAlias(original_key):
class JsonRuleProcessor:
"""Perform checks and gather values from a JSON dict given rules
and metrics definitions"""
"""
Perform checks and gather values from a JSON dict given rules
and metrics definitions
"""
def __init__(self, json_data, rules_args):
self.data = json_data
self.rules = rules_args
separator = '.'
value_separator = ':'
if self.rules.separator:
separator = self.rules.separator
self.helper = JsonHelper(self.data, separator)
if self.rules.value_separator:
value_separator = self.rules.value_separator
self.helper = JsonHelper(self.data, separator, value_separator)
debugPrint(rules_args.debug, "rules:%s" % rules_args)
debugPrint(rules_args.debug, "separator:%s" % separator)
debugPrint(rules_args.debug, "value_separator:%s" % value_separator)
self.metric_list = self.expandKeys(self.rules.metric_list)
self.key_threshold_warning = self.expandKeys(
self.rules.key_threshold_warning)
@ -222,7 +247,7 @@ class JsonRuleProcessor:
def expandKeys(self, src):
if src is None:
return
return []
dest = []
for key in src:
newKeys = self.helper.expandKey(key, [])
@ -243,9 +268,9 @@ class JsonRuleProcessor:
for kv in equality_list:
k, v = kv.split(',')
key, alias = _getKeyAlias(k)
if (self.helper.equals(key, v) == False):
if not self.helper.equals(key, v):
failure += " Key %s mismatch. %s != %s" % (alias, v,
self.helper.get(key))
self.helper.get(key))
return failure
def checkNonEquality(self, equality_list):
@ -253,9 +278,9 @@ class JsonRuleProcessor:
for kv in equality_list:
k, v = kv.split(',')
key, alias = _getKeyAlias(k)
if (self.helper.equals(key, v) == True):
if self.helper.equals(key, v):
failure += " Key %s match found. %s == %s" % (alias, v,
self.helper.get(key))
self.helper.get(key))
return failure
def checkThreshold(self, key, alias, r):
@ -338,8 +363,11 @@ class JsonRuleProcessor:
return unknown
def checkMetrics(self):
"""Return a Nagios specific performance metrics string given keys
and parameter definitions"""
"""
Return a Nagios specific performance metrics string given keys
and parameter definitions
"""
metrics = ''
warning = ''
critical = ''
@ -380,11 +408,16 @@ class JsonRuleProcessor:
return ("%s" % metrics, warning, critical)
def parseArgs():
def parseArgs(args):
"""
CLI argument definitions and parsing
"""
parser = argparse.ArgumentParser(
description = plugin_description + '\n\nVersion: %s (%s)'
%(__version__, __version_date__),
formatter_class=argparse.RawDescriptionHelpFormatter)
description=plugin_description + '\n\nVersion: %s (%s)'
%(__version__, __version_date__),
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('-d', '--debug', action='store_true',
help='debug mode')
@ -416,6 +449,8 @@ def parseArgs():
parser.add_argument('-f', '--field_separator', dest='separator',
help='''JSON Field separator, defaults to ".";
Select element in an array with "(" ")"''')
parser.add_argument('-F', '--value_separator', dest='value_separator',
help='''JSON Value separator, defaults to ":"''')
parser.add_argument('-w', '--warning', dest='key_threshold_warning',
nargs='*',
help='''Warning threshold for these values
@ -474,220 +509,31 @@ def parseArgs():
(key[>alias],UnitOfMeasure,WarnRange,
CriticalRange).''')
return parser.parse_args()
return parser.parse_args(args)
def debugPrint(debug_flag, message, pretty_flag=False):
"""
Print debug messages if -d (debug_flat ) is set.
"""
if debug_flag:
if pretty_flag:
pprint(message)
else:
print(message)
if __name__ == "__main__" and len(sys.argv) >= 2 and sys.argv[1] == 'UnitTest':
import unittest
class RulesHelper:
separator = '.'
debug = False
key_threshold_warning = None
key_value_list = None
key_value_list_not = None
key_list = None
key_threshold_critical = None
key_value_list_critical = None
key_value_list_not_critical = None
key_value_list_unknown = None
key_list_critical = None
metric_list = None
def dash_m(self, data):
self.metric_list = data
return self
def dash_e(self, data):
self.key_list = data
return self
def dash_E(self, data):
self.key_list_critical = data
return self
def dash_q(self, data):
self.key_value_list = data
return self
def dash_Q(self, data):
self.key_value_list_critical = data
return self
def dash_y(self, data):
self.key_value_list_not = data
return self
def dash_Y(self, data):
self.key_value_list_not_critical = data
return self
def dash_w(self, data):
self.key_threshold_warning = data
return self
def dash_c(self, data):
self.key_threshold_critical = data
return self
class UnitTest(unittest.TestCase):
rules = RulesHelper()
def check_data(self, args, jsondata, code):
data = json.loads(jsondata)
nagios = NagiosHelper()
processor = JsonRuleProcessor(data, args)
nagios.append_warning(processor.checkWarning())
nagios.append_critical(processor.checkCritical())
nagios.append_metrics(processor.checkMetrics())
self.assertEqual(code, nagios.getCode())
def test_metrics(self):
self.check_data(RulesHelper().dash_m(['metric,,1:4,1:5']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_m(['metric,,1:5,1:4']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_m(['metric,,1:5,1:5,6,10']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_m(['metric,,1:5,1:5,1,4']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_m(['metric,s,@1:4,@6:10,1,10']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_m(['(*).value,s,1:5,1:5']),
'[{"value": 5},{"value": 100}]', CRITICAL_CODE)
def test_exists(self):
self.check_data(RulesHelper().dash_e(['nothere']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_E(['nothere']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_e(['metric']),
'{"metric": 5}', OK_CODE)
def test_equality(self):
self.check_data(RulesHelper().dash_q(['metric,6']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_Q(['metric,6']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_q(['metric,5']),
'{"metric": 5}', OK_CODE)
def test_non_equality(self):
self.check_data(RulesHelper().dash_y(['metric,6']),
'{"metric": 6}', WARNING_CODE)
self.check_data(RulesHelper().dash_Y(['metric,6']),
'{"metric": 6}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_y(['metric,5']),
'{"metric": 6}', OK_CODE)
def test_warning_thresholds(self):
self.check_data(RulesHelper().dash_w(['metric,5']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,5:']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,~:5']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,1:5']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,@5']),
'{"metric": 6}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,@5:']),
'{"metric": 4}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,@~:5']),
'{"metric": 6}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,@1:5']),
'{"metric": 6}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,5']),
'{"metric": 6}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,5:']),
'{"metric": 4}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,~:5']),
'{"metric": 6}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,1:5']),
'{"metric": 6}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,@5']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,@5:']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,@~:5']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,@1:5']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['(*).value,@1:5']),
'[{"value": 5},{"value": 1000}]', WARNING_CODE)
def test_critical_thresholds(self):
self.check_data(RulesHelper().dash_c(['metric,5']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,5:']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,~:5']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,1:5']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,@5']),
'{"metric": 6}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,@5:']),
'{"metric": 4}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,@~:5']),
'{"metric": 6}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,@1:5']),
'{"metric": 6}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,5']),
'{"metric": 6}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,5:']),
'{"metric": 4}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,~:5']),
'{"metric": 6}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,1:5']),
'{"metric": 6}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,@5']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,@5:']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,@~:5']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,@1:5']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['(*).value,@1:5']),
'[{"value": 5},{"value": 1000}]', CRITICAL_CODE)
def test_separator(self):
rules = RulesHelper()
rules.separator = '_'
self.check_data(
rules.dash_q(
['(0)_gauges_jvm.buffers.direct.capacity(1)_value,1234']),
'''[{ "gauges": { "jvm.buffers.direct.capacity": [
{"value": 215415},{"value": 1234}]}}]''',
OK_CODE)
self.check_data(
rules.dash_q(
['(*)_gauges_jvm.buffers.direct.capacity(1)_value,1234']),
'''[{ "gauges": { "jvm.buffers.direct.capacity": [
{"value": 215415},{"value": 1234}]}},
{ "gauges": { "jvm.buffers.direct.capacity": [
{"value": 215415},{"value": 1235}]}}]''',
WARNING_CODE)
unittest.main()
exit(0)
"""Program entry point"""
# Program entry point
if __name__ == "__main__":
args = parseArgs()
args = parseArgs(sys.argv[1:])
nagios = NagiosHelper()
context = None
if args.version:
print('Version: %s - Date: %s' % (__version__, __version_date__))
exit(0)
sys.exit(0)
if args.ssl:
url = "https://%s" % args.host
@ -705,25 +551,25 @@ if __name__ == "__main__":
context.load_verify_locations(args.cacert)
except ssl.SSLError:
nagios.append_unknown(
''' Error loading SSL CA cert "%s"!'''
% args.cacert)
'Error loading SSL CA cert "%s"!'
% args.cacert)
if args.cert:
try:
context.load_cert_chain(args.cert,keyfile=args.key)
context.load_cert_chain(args.cert, keyfile=args.key)
except ssl.SSLError:
if args.key:
nagios.append_unknown(
''' Error loading SSL cert. Make sure key "%s" belongs to cert "%s"!'''
% (args.key, args.cert))
'Error loading SSL cert. Make sure key "%s" belongs to cert "%s"!'
% (args.key, args.cert))
else:
nagios.append_unknown(
''' Error loading SSL cert. Make sure "%s" contains the key as well!'''
% (args.cert))
'Error loading SSL cert. Make sure "%s" contains the key as well!'
% (args.cert))
if nagios.getCode() != OK_CODE:
print(nagios.getMessage())
exit(nagios.getCode())
sys.exit(nagios.getCode())
else:
url = "http://%s" % args.host
@ -731,13 +577,16 @@ if __name__ == "__main__":
url += ":%s" % args.port
if args.path:
url += "/%s" % args.path
debugPrint(args.debug, "url:%s" % url)
json_data = ''
try:
req = urllib2.Request(url)
req = urllib.request.Request(url)
req.add_header("User-Agent", "check_http_json")
if args.auth:
base64str = base64.encodestring(args.auth).replace('\n', '')
authbytes = str(args.auth).encode()
base64str = base64.encodebytes(authbytes).decode().replace('\n', '')
req.add_header('Authorization', 'Basic %s' % base64str)
if args.headers:
headers = json.loads(args.headers)
@ -745,15 +594,15 @@ if __name__ == "__main__":
for header in headers:
req.add_header(header, headers[header])
if args.timeout and args.data:
response = urllib2.urlopen(req, timeout=args.timeout,
data=args.data, context=context)
response = urllib.request.urlopen(req, timeout=args.timeout,
data=args.data, context=context)
elif args.timeout:
response = urllib2.urlopen(req, timeout=args.timeout,
context=context)
response = urllib.request.urlopen(req, timeout=args.timeout,
context=context)
elif args.data:
response = urllib2.urlopen(req, data=args.data, context=context)
response = urllib.request.urlopen(req, data=args.data, context=context)
else:
response = urllib2.urlopen(req, context=context)
response = urllib.request.urlopen(req, context=context)
json_data = response.read()
@ -779,6 +628,6 @@ if __name__ == "__main__":
# Print Nagios specific string and exit appropriately
print(nagios.getMessage())
exit(nagios.getCode())
sys.exit(nagios.getCode())
#EOF

0
test/__init__.py Normal file
View File

34
test/test_args.py Normal file
View File

@ -0,0 +1,34 @@
#!/usr/bin/env python3
import unittest
import sys
sys.path.append('..')
from check_http_json import *
class ArgsTest(unittest.TestCase):
"""
Tests for argsparse
"""
def test_parser_defaults(self):
parser = parseArgs(['-H', 'foobar'])
self.assertFalse(parser.debug)
self.assertFalse(parser.ssl)
self.assertFalse(parser.insecure)
def test_parser_with_debug(self):
parser = parseArgs(['-H', 'foobar', '-d'])
self.assertTrue(parser.debug)
def test_parser_with_port(self):
parser = parseArgs(['-H', 'foobar', '-P', '8888'])
self.assertEqual(parser.port, '8888')
def test_parser_with_separator(self):
parser = parseArgs(['-H', 'foobar', '-f', '_', '-F', '_'])
self.assertEqual(parser.separator, '_')
self.assertEqual(parser.value_separator, '_')

View File

@ -0,0 +1,276 @@
#!/usr/bin/env python3
import json
import unittest
from unittest.mock import patch
import sys
sys.path.append('..')
from check_http_json import *
OK_CODE = 0
WARNING_CODE = 1
CRITICAL_CODE = 2
UNKNOWN_CODE = 3
class RulesHelper:
separator = '.'
value_separator = ':'
debug = False
key_threshold_warning = None
key_value_list = None
key_value_list_not = None
key_list = None
key_threshold_critical = None
key_value_list_critical = None
key_value_list_not_critical = None
key_value_list_unknown = None
key_list_critical = None
metric_list = None
def dash_m(self, data):
self.metric_list = data
return self
def dash_e(self, data):
self.key_list = data
return self
def dash_E(self, data):
self.key_list_critical = data
return self
def dash_q(self, data):
self.key_value_list = data
return self
def dash_Q(self, data):
self.key_value_list_critical = data
return self
def dash_y(self, data):
self.key_value_list_not = data
return self
def dash_Y(self, data):
self.key_value_list_not_critical = data
return self
def dash_U(self, data):
self.key_value_list_unknown = data
return self
def dash_w(self, data):
self.key_threshold_warning = data
return self
def dash_c(self, data):
self.key_threshold_critical = data
return self
class UtilTest(unittest.TestCase):
"""
Tests for the util fucntions
"""
rules = RulesHelper()
def check_data(self, args, jsondata, code):
data = json.loads(jsondata)
nagios = NagiosHelper()
processor = JsonRuleProcessor(data, args)
nagios.append_warning(processor.checkWarning())
nagios.append_critical(processor.checkCritical())
nagios.append_metrics(processor.checkMetrics())
nagios.append_unknown(processor.checkUnknown())
self.assertEqual(code, nagios.getCode())
def test_metrics(self):
self.check_data(RulesHelper().dash_m(['metric,,1:4,1:5']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_m(['metric,,1:5,1:4']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_m(['metric,,1:5,1:5,6,10']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_m(['metric,,1:5,1:5,1,4']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_m(['metric,s,@1:4,@6:10,1,10']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_m(['(*).value,s,1:5,1:5']),
'[{"value": 5},{"value": 100}]', CRITICAL_CODE)
self.check_data(RulesHelper().dash_m(['metric>foobar,,1:4,1:5']),
'{"metric": 5}', WARNING_CODE)
def test_unknown(self):
self.check_data(RulesHelper().dash_U(['metric,0']),
'{"metric": 3}', UNKNOWN_CODE)
def test_exists(self):
self.check_data(RulesHelper().dash_e(['nothere']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_E(['nothere']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_e(['metric']),
'{"metric": 5}', OK_CODE)
def test_equality(self):
self.check_data(RulesHelper().dash_q(['metric,6']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_Q(['metric,6']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_q(['metric,5']),
'{"metric": 5}', OK_CODE)
def test_equality_colon(self):
"""
See https://github.com/drewkerrigan/nagios-http-json/issues/43
"""
rules = RulesHelper()
rules.value_separator = '_'
# This should not fail
self.check_data(rules.dash_q(['metric,foo:bar']),
'{"metric": "foo:bar"}', OK_CODE)
def test_non_equality(self):
self.check_data(RulesHelper().dash_y(['metric,6']),
'{"metric": 6}', WARNING_CODE)
self.check_data(RulesHelper().dash_Y(['metric,6']),
'{"metric": 6}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_y(['metric,5']),
'{"metric": 6}', OK_CODE)
def test_warning_thresholds(self):
self.check_data(RulesHelper().dash_w(['metric,5']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,5:']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,~:5']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,1:5']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,@5']),
'{"metric": 6}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,@5:']),
'{"metric": 4}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,@~:5']),
'{"metric": 6}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,@1:5']),
'{"metric": 6}', OK_CODE)
self.check_data(RulesHelper().dash_w(['metric,5']),
'{"metric": 6}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,5:']),
'{"metric": 4}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,~:5']),
'{"metric": 6}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,1:5']),
'{"metric": 6}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,@5']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,@5:']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,@~:5']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['metric,@1:5']),
'{"metric": 5}', WARNING_CODE)
self.check_data(RulesHelper().dash_w(['(*).value,@1:5']),
'[{"value": 5},{"value": 1000}]', WARNING_CODE)
def test_critical_thresholds(self):
self.check_data(RulesHelper().dash_c(['metric,5']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,5:']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,~:5']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,1:5']),
'{"metric": 5}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,@5']),
'{"metric": 6}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,@5:']),
'{"metric": 4}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,@~:5']),
'{"metric": 6}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,@1:5']),
'{"metric": 6}', OK_CODE)
self.check_data(RulesHelper().dash_c(['metric,5']),
'{"metric": 6}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,5:']),
'{"metric": 4}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,~:5']),
'{"metric": 6}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,1:5']),
'{"metric": 6}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,@5']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,@5:']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,@~:5']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['metric,@1:5']),
'{"metric": 5}', CRITICAL_CODE)
self.check_data(RulesHelper().dash_c(['(*).value,@1:5']),
'[{"value": 5},{"value": 1000}]', CRITICAL_CODE)
def test_separator(self):
rules = RulesHelper()
rules.separator = '_'
self.check_data(
rules.dash_q(
['(0)_gauges_jvm.buffers.direct.capacity(1)_value,1234']),
'''[{ "gauges": { "jvm.buffers.direct.capacity": [
{"value": 215415},{"value": 1234}]}}]''',
OK_CODE)
self.check_data(
rules.dash_q(
['(*)_gauges_jvm.buffers.direct.capacity(1)_value,1234']),
'''[{ "gauges": { "jvm.buffers.direct.capacity": [
{"value": 215415},{"value": 1234}]}},
{ "gauges": { "jvm.buffers.direct.capacity": [
{"value": 215415},{"value": 1235}]}}]''',
WARNING_CODE)
def test_array_with_missing_element(self):
"""
See https://github.com/drewkerrigan/nagios-http-json/issues/34
"""
rules = RulesHelper()
# This should simply work
data = '[{"Node": "there"}]'
self.check_data(rules.dash_q(['(0).Node,there']), data, OK_CODE)
# This should warn us
data = '[{"Node": "othervalue"}]'
self.check_data(rules.dash_q(['(0).Node,there']), data, WARNING_CODE)
# # This should not throw an IndexError
data = '[{"Node": "foobar"}]'
self.check_data(rules.dash_q(['(0).Node,foobar', '(1).Node,missing']), data, WARNING_CODE)
self.check_data(rules.dash_q(['(0).Node,foobar', '(1).Node,missing', '(2).Node,alsomissing']), data, WARNING_CODE)
# This should not throw a KeyError
data = '{}'
self.check_data(rules.dash_q(['(0).Node,foobar', '(1).Node,missing']), data, WARNING_CODE)
def test_subelem(self):
rules = RulesHelper()
data = '{"foo": {"foo": {"foo": "bar"}}}'
self.check_data(rules.dash_E(['foo.foo.foo.foo.foo']), data, CRITICAL_CODE)
def test_subarrayelem_missing_elem(self):
rules = RulesHelper()
data = '[{"capacity": {"value": 1000}},{"capacity": {"value": 2200}}]'
self.check_data(rules.dash_E(['(*).capacity.value']), data, OK_CODE)
self.check_data(rules.dash_E(['(*).capacity.value.too_deep']), data, CRITICAL_CODE)
# Should not throw keyerror
self.check_data(rules.dash_E(['foo']), data, CRITICAL_CODE)

44
test/test_main.py Normal file
View File

@ -0,0 +1,44 @@
#!/usr/bin/env python3
import unittest
import unittest.mock as mock
import sys
import os
sys.path.append('..')
from check_http_json import debugPrint
class MainTest(unittest.TestCase):
"""
Tests for main
"""
def setUp(self):
"""
Defining the exitcodes
"""
self.exit_0 = 0 << 8
self.exit_1 = 1 << 8
self.exit_2 = 2 << 8
self.exit_3 = 3 << 8
def test_debugprint(self):
with mock.patch('builtins.print') as mock_print:
debugPrint(True, 'debug')
mock_print.assert_called_once_with('debug')
def test_debugprint_pprint(self):
with mock.patch('check_http_json.pprint') as mock_pprint:
debugPrint(True, 'debug', True)
mock_pprint.assert_called_once_with('debug')
def test_cli_without_params(self):
command = '/usr/bin/env python3 check_http_json.py > /dev/null 2>&1'
status = os.system(command)
self.assertEqual(status, self.exit_2)

51
test/test_nagioshelper.py Normal file
View File

@ -0,0 +1,51 @@
#!/usr/bin/env python3
import json
import unittest
from unittest.mock import patch
import sys
sys.path.append('..')
from check_http_json import *
class NagiosHelperTest(unittest.TestCase):
"""
Tests for the NagiosHelper
"""
def test_getcode_default(self):
helper = NagiosHelper()
self.assertEqual(0, helper.getCode())
def test_getcode_warning(self):
helper = NagiosHelper()
helper.warning_message = 'foobar'
self.assertEqual(1, helper.getCode())
def test_getcode_critical(self):
helper = NagiosHelper()
helper.critical_message = 'foobar'
self.assertEqual(2, helper.getCode())
def test_getcode_unknown(self):
helper = NagiosHelper()
helper.unknown_message = 'foobar'
self.assertEqual(3, helper.getCode())
def test_getmessage_default(self):
helper = NagiosHelper()
self.assertEqual('OK: Status OK.', helper.getMessage())
def test_getmessage_perfomance_data(self):
helper = NagiosHelper()
helper.performance_data = 'foobar'
self.assertEqual('OK: foobar Status OK. |foobar', helper.getMessage())