#!/usr/bin/env python3 """ Icinga2 plugin per monitorare lo spazio disponibile sui datastore LVM di Proxmox VE Utilizza HashiCorp Vault con AppRole per ottenere le credenziali Compatible con Proxmox VE 8.3+ e 9.x """ import argparse import sys import requests import json import os from urllib3.exceptions import InsecureRequestWarning # Disabilita warning per certificati SSL non verificati requests.packages.urllib3.disable_warnings(InsecureRequestWarning) class ProxmoxDatastoreMonitor: def __init__(self): self.vault_addr = os.getenv('VAULT_ADDR') self.vault_role_id = os.getenv('VAULT_ROLE_ID') self.vault_secret_id = os.getenv('VAULT_SECRET_ID') self.vault_token = None self.proxmox_ticket = None self.proxmox_csrf = None def exit_with_status(self, status, message, perfdata=None): """Exit con codice di stato Icinga/Nagios standard""" status_map = { 0: "OK", 1: "WARNING", 2: "CRITICAL", 3: "UNKNOWN" } output = f"{status_map[status]}: {message}" if perfdata: output += f" | {perfdata}" print(output) sys.exit(status) def vault_auth(self): """Autenticazione con HashiCorp Vault usando AppRole""" if not all([self.vault_addr, self.vault_role_id, self.vault_secret_id]): self.exit_with_status(3, "Variabili d'ambiente Vault mancanti (VAULT_ADDR, VAULT_ROLE_ID, VAULT_SECRET_ID)") auth_url = f"{self.vault_addr}/v1/auth/approle/login" auth_data = { "role_id": self.vault_role_id, "secret_id": self.vault_secret_id } try: response = requests.post(auth_url, json=auth_data, verify=False, timeout=10) response.raise_for_status() auth_result = response.json() self.vault_token = auth_result['auth']['client_token'] except requests.exceptions.RequestException as e: self.exit_with_status(3, f"Errore autenticazione Vault: {str(e)}") except KeyError: self.exit_with_status(3, "Risposta Vault non valida durante l'autenticazione") def get_credentials_from_vault(self, display_name): """Recupera credenziali dal Vault""" if not self.vault_token: self.vault_auth() secret_url = f"{self.vault_addr}/v1/kv/monitoring/{display_name}" headers = {"X-Vault-Token": self.vault_token} try: response = requests.get(secret_url, headers=headers, verify=False, timeout=10) response.raise_for_status() secret_data = response.json() # Gestisce sia KV v1 che KV v2 if 'data' in secret_data and 'data' in secret_data['data']: # KV v2 credentials = secret_data['data']['data'] else: # KV v1 credentials = secret_data['data'] return credentials['username'], credentials['password'] except requests.exceptions.RequestException as e: self.exit_with_status(3, f"Errore recupero credenziali da Vault: {str(e)}") except KeyError as e: self.exit_with_status(3, f"Credenziali mancanti in Vault: {str(e)}") def proxmox_auth(self, host, username, password): """Autenticazione con Proxmox VE API""" auth_url = f"https://{host}:8006/api2/json/access/ticket" auth_data = { "username": username, "password": password } try: response = requests.post(auth_url, data=auth_data, verify=False, timeout=10) response.raise_for_status() auth_result = response.json() if 'data' in auth_result: self.proxmox_ticket = auth_result['data']['ticket'] self.proxmox_csrf = auth_result['data']['CSRFPreventionToken'] else: self.exit_with_status(3, "Autenticazione Proxmox fallita: credenziali non valide") except requests.exceptions.RequestException as e: self.exit_with_status(3, f"Errore connessione Proxmox: {str(e)}") def get_datastore_info(self, host, datastore_name): """Recupera informazioni sul datastore LVM""" if not self.proxmox_ticket: self.exit_with_status(3, "Ticket Proxmox non disponibile") # Endpoint per ottenere lo stato dei datastore storage_url = f"https://{host}:8006/api2/json/nodes" headers = { "CSRFPreventionToken": self.proxmox_csrf } cookies = { "PVEAuthCookie": self.proxmox_ticket } try: # Prima otteniamo la lista dei nodi response = requests.get(storage_url, headers=headers, cookies=cookies, verify=False, timeout=10) response.raise_for_status() nodes_data = response.json() datastore_info = None # Cerca il datastore su tutti i nodi for node in nodes_data['data']: node_name = node['node'] storage_url = f"https://{host}:8006/api2/json/nodes/{node_name}/storage" response = requests.get(storage_url, headers=headers, cookies=cookies, verify=False, timeout=10) response.raise_for_status() storage_data = response.json() # Cerca il datastore specifico for storage in storage_data['data']: if storage['storage'] == datastore_name: # Ottieni dettagli specifici del storage detail_url = f"https://{host}:8006/api2/json/nodes/{node_name}/storage/{datastore_name}/status" detail_response = requests.get(detail_url, headers=headers, cookies=cookies, verify=False, timeout=10) if detail_response.status_code == 200: datastore_info = detail_response.json()['data'] datastore_info['node'] = node_name break if datastore_info: break if not datastore_info: self.exit_with_status(3, f"Datastore '{datastore_name}' non trovato") return datastore_info except requests.exceptions.RequestException as e: self.exit_with_status(3, f"Errore API Proxmox: {str(e)}") except KeyError as e: self.exit_with_status(3, f"Risposta API Proxmox non valida: {str(e)}") def bytes_to_human(self, bytes_value): """Converte bytes in formato human readable""" if bytes_value is None: return "N/A" for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: if bytes_value < 1024.0: return f"{bytes_value:.2f} {unit}" bytes_value /= 1024.0 return f"{bytes_value:.2f} EB" def check_datastore(self, host, display_name, datastore_name, warning_percent, critical_percent): """Controlla lo stato del datastore""" # Ottieni credenziali da Vault username, password = self.get_credentials_from_vault(display_name) # Autenticati con Proxmox self.proxmox_auth(host, username, password) # Ottieni informazioni datastore datastore_info = self.get_datastore_info(host, datastore_name) # Calcola statistiche total_bytes = datastore_info.get('total', 0) used_bytes = datastore_info.get('used', 0) available_bytes = datastore_info.get('avail', total_bytes - used_bytes) if total_bytes == 0: self.exit_with_status(3, f"Impossibile determinare la dimensione del datastore {datastore_name}") used_percent = (used_bytes / total_bytes) * 100 available_percent = 100 - used_percent # Formatta valori human readable total_hr = self.bytes_to_human(total_bytes) used_hr = self.bytes_to_human(used_bytes) available_hr = self.bytes_to_human(available_bytes) # Performance data per Icinga/Grafana perfdata = ( f"used={used_bytes}B;{int(total_bytes * warning_percent / 100)};{int(total_bytes * critical_percent / 100)};0;{total_bytes} " f"used_percent={used_percent:.2f}%;{warning_percent};{critical_percent};0;100 " f"available={available_bytes}B total={total_bytes}B" ) # Determina stato node = datastore_info.get('node', 'unknown') message = ( f"Datastore '{datastore_name}' su nodo '{node}': " f"{used_percent:.2f}% utilizzato ({used_hr}/{total_hr}), " f"{available_hr} disponibili" ) if used_percent >= critical_percent: self.exit_with_status(2, message, perfdata) elif used_percent >= warning_percent: self.exit_with_status(1, message, perfdata) else: self.exit_with_status(0, message, perfdata) def main(): parser = argparse.ArgumentParser(description='Monitor Proxmox VE Datastore Space') parser.add_argument('-H', '--host', required=True, help='Proxmox host/IP') parser.add_argument('-d', '--display-name', required=True, help='Display name per Vault lookup') parser.add_argument('-s', '--storage', required=True, help='Nome del datastore da monitorare') parser.add_argument('-w', '--warning', type=float, default=80.0, help='Soglia warning in percentuale (default: 80)') parser.add_argument('-c', '--critical', type=float, default=90.0, help='Soglia critical in percentuale (default: 90)') args = parser.parse_args() # Validazione soglie if args.warning >= args.critical: print("UNKNOWN: La soglia warning deve essere inferiore a critical") sys.exit(3) if args.warning < 0 or args.warning > 100 or args.critical < 0 or args.critical > 100: print("UNKNOWN: Le soglie devono essere comprese tra 0 e 100") sys.exit(3) monitor = ProxmoxDatastoreMonitor() monitor.check_datastore( args.host, args.display_name, args.storage, args.warning, args.critical ) if __name__ == "__main__": main()