Add flag to override URL unreachable state

- I refactored the Nagios helper a bit to integrate this functionality a bit simpler.
   Before we had distinct methods on the helper that added warn,crit,unko message, now
   there's a general method that takes an int as parameter.
   This way we avoid if-else structures for the new functionality.
This commit is contained in:
Markus Opolka 2024-03-18 10:18:04 +01:00
parent e96bba0eb8
commit 4fbb0c828a
2 changed files with 28 additions and 22 deletions

View File

@ -69,20 +69,19 @@ class NagiosHelper:
code = UNKNOWN_CODE
return code
def append_warning(self, warning_message):
self.warning_message += warning_message
def append_critical(self, critical_message):
self.critical_message += critical_message
def append_unknown(self, unknown_message):
self.unknown_message += unknown_message
def append_message(self, code, msg):
if code > 2 or code < 0:
self.unknown_message += msg
if code == 1:
self.warning_message += msg
if code == 2:
self.critical_message += msg
def append_metrics(self, metrics):
(performance_data, warning_message, critical_message) = metrics
self.performance_data += performance_data
self.append_warning(warning_message)
self.append_critical(critical_message)
self.append_message(WARNING_CODE, warning_message)
self.append_message(CRITICAL_CODE, critical_message)
class JsonHelper:
@ -444,6 +443,8 @@ def parseArgs(args):
parser.add_argument('-p', '--path', dest='path', help='Path')
parser.add_argument('-t', '--timeout', type=int,
help='Connection timeout (seconds)')
parser.add_argument('--unreachable-state', type=int, default=3,
help='Exit with specified code if URL unreachable. Examples: 1 for Warning, 2 for Critical, 3 for Unknown (default: 3)')
parser.add_argument('-B', '--basic-auth', dest='auth',
help='Basic auth string "username:password"')
parser.add_argument('-D', '--data', dest='data',
@ -548,15 +549,15 @@ def prepare_context(args):
try:
context.load_verify_locations(args.cacert)
except ssl.SSLError:
nagios.append_unknown('Error loading SSL CA cert "%s"!' % args.cacert)
nagios.append_message(UNKNOWN_CODE, 'Error loading SSL CA cert "%s"!' % args.cacert)
if args.cert:
try:
context.load_cert_chain(args.cert, keyfile=args.key)
except ssl.SSLError:
if args.key:
nagios.append_unknown('Error loading SSL cert. Make sure key "%s" belongs to cert "%s"!' % (args.key, args.cert))
nagios.append_message(UNKNOWN_CODE, 'Error loading SSL cert. Make sure key "%s" belongs to cert "%s"!' % (args.key, args.cert))
else:
nagios.append_unknown('Error loading SSL cert. Make sure "%s" contains the key as well!' % (args.cert))
nagios.append_message(UNKNOWN_CODE, 'Error loading SSL cert. Make sure "%s" contains the key as well!' % (args.cert))
if nagios.getCode() != OK_CODE:
print(nagios.getMessage())
@ -630,24 +631,29 @@ def main(cliargs):
if "json" in e.info().get_content_subtype():
json_data = e.read()
else:
nagios.append_unknown(" HTTPError[%s], url:%s" % (str(e.code), url))
nagios.append_message(UNKNOWN_CODE, " Could not find JSON in HTTP body. HTTPError[%s], url:%s" % (str(e.code), url))
except URLError as e:
nagios.append_critical(" URLError[%s], url:%s" % (str(e.reason), url))
# Some users might prefer another exit code if the URL wasn't reached
exit_code = args.unreachable_state
nagios.append_message(exit_code, " URLError[%s], url:%s" % (str(e.reason), url))
# Since we don't got any data, we can simply exit
print(nagios.getMessage())
sys.exit(nagios.getCode())
try:
data = json.loads(json_data)
except ValueError as e:
nagios.append_unknown(" Parser error: %s" % str(e))
nagios.append_message(UNKNOWN_CODE, " JSON Parser error: %s" % str(e))
else:
debugPrint(args.debug, 'json:')
debugPrint(args.debug, data, True)
# Apply rules to returned JSON data
processor = JsonRuleProcessor(data, args)
nagios.append_warning(processor.checkWarning())
nagios.append_critical(processor.checkCritical())
nagios.append_message(WARNING_CODE, processor.checkWarning())
nagios.append_message(CRITICAL_CODE, processor.checkCritical())
nagios.append_metrics(processor.checkMetrics())
nagios.append_unknown(processor.checkUnknown())
nagios.append_message(UNKNOWN_CODE, processor.checkUnknown())
# Print Nagios specific string and exit appropriately
print(nagios.getMessage())

View File

@ -84,10 +84,10 @@ class UtilTest(unittest.TestCase):
data = json.loads(jsondata)
nagios = NagiosHelper()
processor = JsonRuleProcessor(data, args)
nagios.append_warning(processor.checkWarning())
nagios.append_critical(processor.checkCritical())
nagios.append_message(WARNING_CODE, processor.checkWarning())
nagios.append_message(CRITICAL_CODE, processor.checkCritical())
nagios.append_metrics(processor.checkMetrics())
nagios.append_unknown(processor.checkUnknown())
nagios.append_message(UNKNOWN_CODE, processor.checkUnknown())
self.assertEqual(code, nagios.getCode())
def test_metrics(self):