Edit file File name : check_jetbackup_alerts Content :#!/usr/bin/python import json import subprocess import sys from datetime import datetime, timedelta import argparse def is_command_available(command): try: subprocess.call([command], stdout=subprocess.PIPE, stderr=subprocess.PIPE) return True except OSError: return False def execute_jetapi_command(): command = ["jetapi", "backup", "-F", "listAlerts", "-O", "json"] process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, _ = process.communicate() return output.decode("utf-8") def execute_jetbackup5api_command(): command = ["jetbackup5api", "-F", "listAlerts", "-D", "sort[created]=-1", "-O", "json"] process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, _ = process.communicate() return output.decode("utf-8") def get_current_users_count(): try: command = "whmapi1 --output=jsonpretty get_current_users_count" process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) output, _ = process.communicate() response_data = json.loads(output) users_count = response_data['data']['users'] return users_count except: command = "plesk bin customer --list | wc -l" process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) output, _ = process.communicate() users_count = output.strip() return int(users_count) def get_critical_titles(alerts, hours, command): titles = "" current_time = datetime.now() threshold_time = current_time - timedelta(hours=hours) exclude = ["Restore process for the account"] for alert in alerts: created_str = alert["created"] created_datetime = datetime.strptime(created_str[:-6], "%Y-%m-%dT%H:%M:%S") if command == "jetapi": if alert.get("level", "0") == 128 and created_datetime > threshold_time: title = alert["title"] if any(exc in title for exc in exclude): continue # Skip this alert and move to the next one if titles: titles += " | " + title else: titles += title elif command == "jetbackup5api": if alert.get("level", "0") == 4 and created_datetime > threshold_time: title = alert["title"] if any(exc in title for exc in exclude): continue # Skip this alert and move to the next one if titles: titles += " | " + title else: titles += title return titles # Check which command is available and execute the corresponding one if is_command_available("jetbackup5api"): output = execute_jetbackup5api_command() command = "jetbackup5api" elif is_command_available("jetapi"): output = execute_jetapi_command() command = "jetapi" else: print("JetBackup not found on the server.") sys.exit(0) # Parse the JSON output data = json.loads(output) alerts = data["data"]["alerts"] # Parse command line arguments parser = argparse.ArgumentParser() parser.add_argument("-c", "--critical-hours", type=int, default=24, help="Threshold hours for critical alerts") parser.add_argument("-w", "--warning-hours", type=int, default=12, help="Threshold hours for warning alerts") args = parser.parse_args() # Get current users count users_count = get_current_users_count() # Get critical and warning titles critical_titles = get_critical_titles(alerts, args.critical_hours, command) warning_titles = get_critical_titles(alerts, args.warning_hours, command) # Check if there are titles and print accordingly found_no_accounts = any("No accounts found for backup" in alert.get("message", "") for alert in alerts) jb_config_export = any("JB configurations export Backup job Failed" in alert.get("title", "") for alert in alerts) jb_download_backup = any("Download process for the account" in alert.get("title", "") for alert in alerts) if critical_titles: if users_count > 0 and found_no_accounts and not jb_config_export: print("[CRITICAL]: " + critical_titles + ". (" + str(users_count) + " users)") sys.exit(2) elif users_count == 0 and found_no_accounts: print("[OK]: No accounts found for backup. (" + str(users_count) + " users)") sys.exit(0) elif jb_config_export and len(critical_titles.split("|")) == 1: print("[WARNING]: JB configurations export Backup job Failed. (" + str(users_count) + " users)") sys.exit(1) elif jb_download_backup and len(critical_titles.split("|")) == 1: print("[WARNING]: " + critical_titles + ". (" + str(users_count) + " users)") sys.exit(1) else: print("[CRITICAL]: " + critical_titles + ". (" + str(users_count) + " users)") sys.exit(2) elif warning_titles: if users_count > 0 and found_no_accounts: print("[WARNING]: " + warning_titles + ". (" + str(users_count) + " users)") sys.exit(1) elif users_count == 0 and found_no_accounts: print("[OK]: No accounts found for backup. (" + str(users_count) + " users)") sys.exit(0) else: print("[WARNING]: " + warning_titles + ". (" + str(users_count) + " users)") sys.exit(1) else: print("JetBackup is OK (" + str(users_count) + " users)") sys.exit(0) Save