monitoring/check_proxmox_datastore.py

260 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Icinga2 plugin per monitorare lo spazio disponibile sui datastore LVM di Proxmox VE
Utilizza HashiCorp Vault con AppRole per ottenere le credenziali
Compatible con Proxmox VE 8.3+ e 9.x
"""
import argparse
import sys
import requests
import json
import os
from urllib3.exceptions import InsecureRequestWarning
# Disabilita warning per certificati SSL non verificati
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
class ProxmoxDatastoreMonitor:
def __init__(self):
self.vault_addr = os.getenv('VAULT_ADDR')
self.vault_role_id = os.getenv('VAULT_ROLE_ID')
self.vault_secret_id = os.getenv('VAULT_SECRET_ID')
self.vault_token = None
self.proxmox_ticket = None
self.proxmox_csrf = None
def exit_with_status(self, status, message, perfdata=None):
"""Exit con codice di stato Icinga/Nagios standard"""
status_map = {
0: "OK",
1: "WARNING",
2: "CRITICAL",
3: "UNKNOWN"
}
output = f"{status_map[status]}: {message}"
if perfdata:
output += f" | {perfdata}"
print(output)
sys.exit(status)
def vault_auth(self):
"""Autenticazione con HashiCorp Vault usando AppRole"""
if not all([self.vault_addr, self.vault_role_id, self.vault_secret_id]):
self.exit_with_status(3, "Variabili d'ambiente Vault mancanti (VAULT_ADDR, VAULT_ROLE_ID, VAULT_SECRET_ID)")
auth_url = f"{self.vault_addr}/v1/auth/approle/login"
auth_data = {
"role_id": self.vault_role_id,
"secret_id": self.vault_secret_id
}
try:
response = requests.post(auth_url, json=auth_data, verify=False, timeout=10)
response.raise_for_status()
auth_result = response.json()
self.vault_token = auth_result['auth']['client_token']
except requests.exceptions.RequestException as e:
self.exit_with_status(3, f"Errore autenticazione Vault: {str(e)}")
except KeyError:
self.exit_with_status(3, "Risposta Vault non valida durante l'autenticazione")
def get_credentials_from_vault(self, display_name):
"""Recupera credenziali dal Vault"""
if not self.vault_token:
self.vault_auth()
secret_url = f"{self.vault_addr}/v1/kv/monitoring/{display_name}"
headers = {"X-Vault-Token": self.vault_token}
try:
response = requests.get(secret_url, headers=headers, verify=False, timeout=10)
response.raise_for_status()
secret_data = response.json()
# Gestisce sia KV v1 che KV v2
if 'data' in secret_data and 'data' in secret_data['data']:
# KV v2
credentials = secret_data['data']['data']
else:
# KV v1
credentials = secret_data['data']
return credentials['username'], credentials['password']
except requests.exceptions.RequestException as e:
self.exit_with_status(3, f"Errore recupero credenziali da Vault: {str(e)}")
except KeyError as e:
self.exit_with_status(3, f"Credenziali mancanti in Vault: {str(e)}")
def proxmox_auth(self, host, username, password):
"""Autenticazione con Proxmox VE API"""
auth_url = f"https://{host}:8006/api2/json/access/ticket"
auth_data = {
"username": username,
"password": password
}
try:
response = requests.post(auth_url, data=auth_data, verify=False, timeout=10)
response.raise_for_status()
auth_result = response.json()
if 'data' in auth_result:
self.proxmox_ticket = auth_result['data']['ticket']
self.proxmox_csrf = auth_result['data']['CSRFPreventionToken']
else:
self.exit_with_status(3, "Autenticazione Proxmox fallita: credenziali non valide")
except requests.exceptions.RequestException as e:
self.exit_with_status(3, f"Errore connessione Proxmox: {str(e)}")
def get_datastore_info(self, host, datastore_name):
"""Recupera informazioni sul datastore LVM"""
if not self.proxmox_ticket:
self.exit_with_status(3, "Ticket Proxmox non disponibile")
# Endpoint per ottenere lo stato dei datastore
storage_url = f"https://{host}:8006/api2/json/nodes"
headers = {
"CSRFPreventionToken": self.proxmox_csrf
}
cookies = {
"PVEAuthCookie": self.proxmox_ticket
}
try:
# Prima otteniamo la lista dei nodi
response = requests.get(storage_url, headers=headers, cookies=cookies, verify=False, timeout=10)
response.raise_for_status()
nodes_data = response.json()
datastore_info = None
# Cerca il datastore su tutti i nodi
for node in nodes_data['data']:
node_name = node['node']
storage_url = f"https://{host}:8006/api2/json/nodes/{node_name}/storage"
response = requests.get(storage_url, headers=headers, cookies=cookies, verify=False, timeout=10)
response.raise_for_status()
storage_data = response.json()
# Cerca il datastore specifico
for storage in storage_data['data']:
if storage['storage'] == datastore_name:
# Ottieni dettagli specifici del storage
detail_url = f"https://{host}:8006/api2/json/nodes/{node_name}/storage/{datastore_name}/status"
detail_response = requests.get(detail_url, headers=headers, cookies=cookies, verify=False, timeout=10)
if detail_response.status_code == 200:
datastore_info = detail_response.json()['data']
datastore_info['node'] = node_name
break
if datastore_info:
break
if not datastore_info:
self.exit_with_status(3, f"Datastore '{datastore_name}' non trovato")
return datastore_info
except requests.exceptions.RequestException as e:
self.exit_with_status(3, f"Errore API Proxmox: {str(e)}")
except KeyError as e:
self.exit_with_status(3, f"Risposta API Proxmox non valida: {str(e)}")
def bytes_to_human(self, bytes_value):
"""Converte bytes in formato human readable"""
if bytes_value is None:
return "N/A"
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
if bytes_value < 1024.0:
return f"{bytes_value:.2f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.2f} EB"
def check_datastore(self, host, display_name, datastore_name, warning_percent, critical_percent):
"""Controlla lo stato del datastore"""
# Ottieni credenziali da Vault
username, password = self.get_credentials_from_vault(display_name)
# Autenticati con Proxmox
self.proxmox_auth(host, username, password)
# Ottieni informazioni datastore
datastore_info = self.get_datastore_info(host, datastore_name)
# Calcola statistiche
total_bytes = datastore_info.get('total', 0)
used_bytes = datastore_info.get('used', 0)
available_bytes = datastore_info.get('avail', total_bytes - used_bytes)
if total_bytes == 0:
self.exit_with_status(3, f"Impossibile determinare la dimensione del datastore {datastore_name}")
used_percent = (used_bytes / total_bytes) * 100
available_percent = 100 - used_percent
# Formatta valori human readable
total_hr = self.bytes_to_human(total_bytes)
used_hr = self.bytes_to_human(used_bytes)
available_hr = self.bytes_to_human(available_bytes)
# Performance data per Icinga/Grafana
perfdata = (
f"used={used_bytes}B;{int(total_bytes * warning_percent / 100)};{int(total_bytes * critical_percent / 100)};0;{total_bytes} "
f"used_percent={used_percent:.2f}%;{warning_percent};{critical_percent};0;100 "
f"available={available_bytes}B total={total_bytes}B"
)
# Determina stato
node = datastore_info.get('node', 'unknown')
message = (
f"Datastore '{datastore_name}' su nodo '{node}': "
f"{used_percent:.2f}% utilizzato ({used_hr}/{total_hr}), "
f"{available_hr} disponibili"
)
if used_percent >= critical_percent:
self.exit_with_status(2, message, perfdata)
elif used_percent >= warning_percent:
self.exit_with_status(1, message, perfdata)
else:
self.exit_with_status(0, message, perfdata)
def main():
parser = argparse.ArgumentParser(description='Monitor Proxmox VE Datastore Space')
parser.add_argument('-H', '--host', required=True, help='Proxmox host/IP')
parser.add_argument('-d', '--display-name', required=True, help='Display name per Vault lookup')
parser.add_argument('-s', '--storage', required=True, help='Nome del datastore da monitorare')
parser.add_argument('-w', '--warning', type=float, default=80.0, help='Soglia warning in percentuale (default: 80)')
parser.add_argument('-c', '--critical', type=float, default=90.0, help='Soglia critical in percentuale (default: 90)')
args = parser.parse_args()
# Validazione soglie
if args.warning >= args.critical:
print("UNKNOWN: La soglia warning deve essere inferiore a critical")
sys.exit(3)
if args.warning < 0 or args.warning > 100 or args.critical < 0 or args.critical > 100:
print("UNKNOWN: Le soglie devono essere comprese tra 0 e 100")
sys.exit(3)
monitor = ProxmoxDatastoreMonitor()
monitor.check_datastore(
args.host,
args.display_name,
args.storage,
args.warning,
args.critical
)
if __name__ == "__main__":
main()