Edit file File name : check_jetbackup.py Content :#!/usr/bin/env python3 """ JetBackup Health Check Plugin for Icinga Description: - Checks JetBackup MongoDB connection - Verifies JetBackup license status - Monitors backup job durations (--cdur / --wdur) - Monitors recent JetBackup alerts (--calert / --walert) - Prints backup destiantion(s) Usage: ./check_jetbackup.py [--cdur 24] [--wdur 12] [--calert 24] [--walert 12] Exit Codes: 0 OK 1 WARNING 2 CRITICAL 3 UNKNOWN """ import os import sys import subprocess import shutil import json import re from datetime import datetime, timedelta import argparse def run_command(command): try: return subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, universal_newlines=True ) except subprocess.CalledProcessError: return None def check_jetmongo(): auth_file = "/usr/local/jetapps/etc/.mongod.auth" if not os.path.isfile(auth_file): print("OK: JetBackup is not installed on this server") return 0 try: with open(auth_file) as f: password = next((line.strip().split("=", 1)[1] for line in f if line.startswith("PASS=")), None) if not password: print("CRITICAL: Password not found in auth file") return 2 except Exception as e: print(f"CRITICAL: Failed to read auth file: {e}") return 2 mongo_paths = [ "/usr/local/jetapps/usr/bin/mongo", "/usr/local/jetapps/usr/bin/mongosh" ] mongo_cli = next((path for path in mongo_paths if os.access(path, os.X_OK)), None) if not mongo_cli: print("CRITICAL: mongo cli not found") return 2 result = run_command([ mongo_cli, "--port", "27217", "--username", "root", "--password", password, "--authenticationDatabase", "admin", "--quiet", "jetbackup", "--eval", "printjson(db.stats().ok)" ]) if result is None: print("CRITICAL: Failed to connect to JetMongo") return 2 if result.stdout.strip() != "1": print("CRITICAL: JetMongo is not working") return 2 print("OK: JetMongo is working") return 0 def check_jetbackup5_license(): result = run_command([ "sudo", "/usr/bin/jetbackup5api", "-F", "getDashboardDetails", "-O", "json" ]) if result is None: print("CRITICAL: Failed to run jetbackup5api") return 2 if '"licenseIssue":false' in result.stdout: print("OK: No licensing issues found (JetBackup 5)") return 0 else: print("CRITICAL: There are licensing issues with the JetBackup 5 instance") return 2 def check_jetbackup4_license(): result = run_command([ "sudo", "/usr/bin/jetapi", "backup", "-F", "licenseStatus" ]) if result is None: print("CRITICAL: Failed to run jetapi") return 2 if "licenseIssue: 1" in result.stdout: print("CRITICAL: There are licensing issues with the JetBackup 4 instance") return 2 else: print("OK: No licensing issues found (JetBackup 4)") return 0 def check_jetbackup_license(): if shutil.which("jetbackup5api"): return check_jetbackup5_license() else: return check_jetbackup4_license() def parse_time(time_str): if not time_str: return None time_str = re.sub(r"(\d{2}):(\d{2})$", r"\1\2", time_str) timezone_offset = timedelta(hours=int(time_str[-4:-2]), minutes=int(time_str[-2:])) return datetime.strptime(time_str[:-6], "%Y-%m-%dT%H:%M:%S") + timezone_offset def check_jetbackup_job_durations(): parser = argparse.ArgumentParser(add_help=False) parser.add_argument("--cdur", type=int, default=24, help="Critical duration threshold in hours (default: 24)") parser.add_argument("--wdur", type=int, default=12, help="Warning duration threshold in hours (default: 12)") args, _ = parser.parse_known_args() critical_hours = args.cdur warning_hours = args.wdur if shutil.which("jetbackup5api"): jobs_cmd = ["jetbackup5api", "-F", "listBackupJobs", "-O", "json"] elif shutil.which("jetapi"): jobs_cmd = ["jetapi", "backup", "-F", "listBackupJobs", "-O", "json"] else: print("UNKNOWN: No suitable JetBackup command found") return 3 jobs_output = run_command(jobs_cmd) if not jobs_output: print("UNKNOWN: Failed to get JetBackup job data") return 3 try: jobs_data = json.loads(jobs_output.stdout) except ValueError: print("UNKNOWN: Failed to parse JetBackup jobs JSON") return 3 jobs = jobs_data.get("data", {}).get("jobs", []) if not jobs: print("CRITICAL: No backup jobs found") return 2 now = datetime.now() critical_alerts = [] warning_alerts = [] for job in jobs: job_id = job.get("_id") running = job.get("running", False) last_run = parse_time(job.get("last_run")) if running and last_run: age = now - last_run if age > timedelta(hours=critical_hours): critical_alerts.append(job_id) elif age > timedelta(hours=warning_hours): warning_alerts.append(job_id) if critical_alerts: print(f"CRITICAL: BackupJob(s) running > {critical_hours}h: {', '.join(critical_alerts)}") return 2 elif warning_alerts: print(f"WARNING: BackupJob(s) running > {warning_hours}h: {', '.join(warning_alerts)}") return 1 print("OK: JetBackup Queue is ok") return 0 def check_jetbackup_alerts(): parser = argparse.ArgumentParser(add_help=False) parser.add_argument("--calert", type=int, default=24, help="Critical alert threshold in hours") parser.add_argument("--walert", type=int, default=12, help="Warning alert threshold in hours") args, _ = parser.parse_known_args() def is_command_available(command): try: subprocess.call([command], stdout=subprocess.PIPE, stderr=subprocess.PIPE) return True except OSError: return False def get_current_users_count(): try: if os.path.exists("/usr/local/cpanel/cpanel"): command = "whmapi1 --output=jsonpretty get_current_users_count" process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) output, _ = process.communicate() data = json.loads(output) return data['data']['users'] except: pass try: if os.path.exists("/usr/local/psa/version"): command = "plesk bin customer --list | wc -l" process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) output, _ = process.communicate() return int(output.strip()) except: pass try: if os.path.exists("/usr/local/directadmin/directadmin"): command = "/bin/ls -1 /usr/local/directadmin/data/users/ | /usr/bin/wc -l" process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) output, _ = process.communicate() return int(output.decode('utf-8').strip()) except: pass def get_critical_titles(alerts, hours, source_cmd): titles = "" now = datetime.now() threshold = now - timedelta(hours=hours) exclude = ["Restore process for the account"] for alert in alerts: created = datetime.strptime(alert["created"][:-6], "%Y-%m-%dT%H:%M:%S") title = alert["title"] if any(ex in title for ex in exclude): continue if source_cmd == "jetapi" and alert.get("level", "0") == 128 and created > threshold: titles += (" | " if titles else "") + title elif source_cmd == "jetbackup5api" and alert.get("level", "0") == 4 and created > threshold: titles += (" | " if titles else "") + title return titles if is_command_available("jetbackup5api"): output = run_command(["jetbackup5api", "-F", "listAlerts", "-D", "sort[created]=-1", "-O", "json"]) command = "jetbackup5api" elif is_command_available("jetapi"): output = run_command(["jetapi", "backup", "-F", "listAlerts", "-O", "json"]) command = "jetapi" else: print("JetBackup not found on the server") return 0 if not output: print("UNKNOWN: Failed to retrieve alerts") return 3 try: data = json.loads(output.stdout) alerts = data["data"]["alerts"] except Exception: print("UNKNOWN: Failed to parse JetBackup alerts") return 3 users_count = get_current_users_count() critical_titles = get_critical_titles(alerts, args.calert, command) warning_titles = get_critical_titles(alerts, args.walert, command) found_no_accounts = any("No accounts found for backup" in a.get("message", "") for a in alerts) jb_config_export = any("JB configurations export Backup job Failed" in a.get("title", "") for a in alerts) jb_download_backup = any("Download process for the account" in a.get("title", "") for a in alerts) if critical_titles: if users_count > 0 and found_no_accounts and not jb_config_export: print(f"CRITICAL: {critical_titles} ({users_count} users)") return 2 elif users_count == 0 and found_no_accounts: print(f"OK: No accounts found for backup ({users_count} users)") return 0 elif jb_config_export and len(critical_titles.split("|")) == 1: print(f"WARNING: JB configurations export Backup job Failed ({users_count} users)") return 1 elif jb_download_backup and len(critical_titles.split("|")) == 1: print(f"WARNING: {critical_titles} ({users_count} users)") return 1 else: print(f"CRITICAL: {critical_titles} ({users_count} users)") return 2 elif warning_titles: if users_count > 0 and found_no_accounts: print(f"WARNING: {warning_titles} ({users_count} users)") return 1 elif users_count == 0 and found_no_accounts: print(f"OK: No accounts found for backup ({users_count} users)") return 0 else: print(f"WARNING: {warning_titles} ({users_count} users)") return 1 else: print(f"OK: JetBackup alerts clean ({users_count} users)") return 0 def get_backup_destinations(): binary = None for cmd in ['jetbackup5api', 'jetapi']: if os.path.exists(os.path.join('/usr/bin', cmd)): binary = cmd break try: subprocess.check_output(['which', cmd], stderr=subprocess.PIPE) binary = cmd break except subprocess.CalledProcessError: continue try: # Run the API command and get output output = subprocess.check_output( [binary, "-F", "listBackupJobs", "-O", "json"], stderr=subprocess.PIPE ).decode('utf-8') # Parse JSON and extract unique hosts data = json.loads(output) jet_destinations = set() jobs = data.get('data', {}).get('jobs', []) for job in jobs: destinations = job.get('destination_details', []) for dest in destinations: options = dest.get('options', {}) host = options.get('host') if host: jet_destinations.add(host) elif options.get('bucket'): bucket = options.get('bucket') region = options.get('region', '') endpoint = options.get('endpoint', '') formatted = f"bucket={bucket}/region={region}/endpoint={endpoint}" jet_destinations.add(formatted) backup_destinations = ",".join(jet_destinations) if jet_destinations else None if backup_destinations: print("OK: Backup destination(s):", backup_destinations) return 0 else: print("OK: Could not find a backup destination") return 0 except Exception as e: return None if __name__ == "__main__": if not (shutil.which("jetbackup5api") or shutil.which("jetbackup4api") or shutil.which("jetapi")): print("OK: JetBackup is not installed on this server") sys.exit(0) if shutil.which("backuply"): print("OK: Backuply is installed on this server") sys.exit(0) for check_fn in [check_jetmongo, get_backup_destinations, check_jetbackup_license, check_jetbackup_job_durations, check_jetbackup_alerts]: result = check_fn() if result != 0: sys.exit(result) Save