#!/usr/bin/env python3 """ Get Milvus Usage Statistics This script connects to Milvus and PostgreSQL to retrieve comprehensive usage statistics including collection counts, row counts, storage information, and tenant/knowledge-base details. Requirements: pip install pymilvus psycopg2-binary Environment Variables: WXO_MILVUS_URI - Milvus connection URI (e.g., http://localhost:19530) MILVUS_DB_NAME - Milvus database name (default: default) MILVUS_TLS_CERT_PATH - Path to Milvus server TLS certificate PEM file (default: /opt/secret/milvus/tls.crt) POSTGRES_URL - PostgreSQL connection URL (e.g., postgresql://user:pass@host:port/dbname) Credentials: Milvus - Reads from files: - /opt/secret/milvus/MILVUS_USERNAME - /opt/secret/milvus/MILVUS_PASSWORD PostgreSQL - Reads from files (in order of preference): - /opt/secret/postgres/DB_CONNECTION_URI_ARCHER (full connection URI) - /opt/secret/postgres/POSTGRES_USERNAME and /opt/secret/postgres/POSTGRES_PASSWORD - POSTGRES_URL environment variable Usage: # Basic usage - display stats as table with tenant/KB info (default) python get_milvus_usage_stats.py # Skip tenant and knowledge-base information from PostgreSQL python get_milvus_usage_stats.py --skip-tenant-info # Output as JSON python get_milvus_usage_stats.py --output-format json # Output as CSV python get_milvus_usage_stats.py --output-format csv # Save to file python get_milvus_usage_stats.py --output stats.json --output-format json # Show detailed collection stats python get_milvus_usage_stats.py --detailed """ import argparse import csv import json import logging import os import sys from io import StringIO from typing import Dict, List, Optional, Tuple from urllib.parse import urlparse logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class MilvusUsageStats: """Collects and formats Milvus usage statistics with optional PostgreSQL tenant information.""" def __init__(self): self.milvus_client = None self.milvus_uri = None self.db_name = None self.username = None self.password = None self.server_pem_path = None self.pg_conn = None self.pg_url = None self.tenant_kb_map = {} # Maps milvus_collection_name to tenant/KB info def _read_milvus_credentials(self) -> Tuple[str, str]: """Read Milvus credentials from files.""" try: with open('/opt/secret/milvus/MILVUS_USERNAME', 'r') as f: username = f.read().strip() with open('/opt/secret/milvus/MILVUS_PASSWORD', 'r') as f: password = f.read().strip() logger.info("Successfully read Milvus credentials from files") return username, password except FileNotFoundError as e: logger.warning(f"Milvus credential file not found: {e}") logger.info("Falling back to environment variables") username = os.getenv('MILVUS_USERNAME', '') password = os.getenv('MILVUS_PASSWORD', '') return username, password except Exception as e: logger.error(f"Error reading Milvus credentials: {e}") raise def _read_postgres_credentials(self) -> Tuple[str, str]: """Read PostgreSQL credentials from files.""" try: with open('/opt/secret/postgres/POSTGRES_USERNAME', 'r') as f: username = f.read().strip() with open('/opt/secret/postgres/POSTGRES_PASSWORD', 'r') as f: password = f.read().strip() logger.info("Successfully read PostgreSQL credentials from files") return username, password except FileNotFoundError as e: logger.warning(f"PostgreSQL credential file not found: {e}") return '', '' except Exception as e: logger.error(f"Error reading PostgreSQL credentials: {e}") return '', '' def _init_postgres_connection(self): """Initialize PostgreSQL connection.""" try: import psycopg2 from psycopg2.extras import RealDictCursor # Get PostgreSQL URL - try multiple sources self.pg_url = os.getenv('POSTGRES_URL') if not self.pg_url: # Try reading from DB_CONNECTION_URI_ARCHER file try: with open('/opt/secret/postgres/DB_CONNECTION_URI_ARCHER', 'r') as f: self.pg_url = f.read().strip() logger.info("Read PostgreSQL URL from DB_CONNECTION_URI_ARCHER file") except FileNotFoundError: logger.debug("DB_CONNECTION_URI_ARCHER file not found") except Exception as e: logger.debug(f"Error reading DB_CONNECTION_URI_ARCHER: {e}") if not self.pg_url: logger.warning("POSTGRES_URL environment variable not set and DB_CONNECTION_URI_ARCHER file not found") # Try to construct from credentials username, password = self._read_postgres_credentials() if username and password: # Construct URL from environment or defaults host = os.getenv('POSTGRES_HOST', 'localhost') port = os.getenv('POSTGRES_PORT', '5432') dbname = os.getenv('POSTGRES_DB', 'postgres') self.pg_url = f"postgresql://{username}:{password}@{host}:{port}/{dbname}" else: logger.warning("Cannot initialize PostgreSQL connection without credentials") return False logger.info(f"Connecting to PostgreSQL...") self.pg_conn = psycopg2.connect(self.pg_url, cursor_factory=RealDictCursor) logger.info("✓ Connected successfully to PostgreSQL") return True except ImportError: logger.error("psycopg2 not installed. Install with: pip install psycopg2-binary") return False except Exception as e: logger.error(f"Failed to initialize PostgreSQL connection: {e}") return False def query_tenant_kb_info(self) -> Dict[str, Dict]: """ Query tenant and knowledge-base information from PostgreSQL. Returns a dictionary mapping milvus_collection_name to tenant/KB details. """ if not self.pg_conn: logger.warning("PostgreSQL connection not initialized, skipping tenant info") return {} try: cursor = self.pg_conn.cursor() # Query based on step1_query_tenant_collections.sql query = """ SELECT t.id::text as tenant_id, t.name as tenant_name, kb.id::text as kb_id, kb.name as kb_name, kb.display_name as kb_display_name, vi.id::text as vector_index_id, vi.name as vector_index_name, 'collection_' || REPLACE(REPLACE(vi.id::text, '-', '_'), '/', '_') as milvus_collection_name, kb.created_on as kb_created_on FROM tenants t LEFT JOIN knowledge_bases kb ON kb.tenant_id = t.id LEFT JOIN vector_indices vi ON vi.id = kb.vector_index_id WHERE kb.id IS NOT NULL AND kb.prioritize_built_in_index is true ORDER BY t.name, kb.name """ cursor.execute(query) rows = cursor.fetchall() logger.info(f"Found {len(rows)} tenant/knowledge-base records in PostgreSQL") # Build mapping tenant_kb_map = {} for row in rows: collection_name = row['milvus_collection_name'] tenant_kb_map[collection_name] = { 'tenant_id': row['tenant_id'], 'tenant_name': row['tenant_name'], 'kb_id': row['kb_id'], 'kb_name': row['kb_name'], 'kb_display_name': row['kb_display_name'], 'kb_created_on': row['kb_created_on'].isoformat() if row['kb_created_on'] else None } cursor.close() return tenant_kb_map except Exception as e: logger.error(f"Error querying tenant/KB info from PostgreSQL: {e}") return {} def _init_milvus_client(self, db_name: Optional[str] = None): """Initialize Milvus client for a specific database.""" try: from pymilvus import MilvusClient # Get Milvus URI if not self.milvus_uri: self.milvus_uri = os.getenv('WXO_MILVUS_URI') if not self.milvus_uri: raise ValueError("WXO_MILVUS_URI environment variable is required") # Read credentials if not already done if not self.username or not self.password: self.username, self.password = self._read_milvus_credentials() # Get database name if db_name is None: db_name = os.getenv('MILVUS_DB_NAME', 'default') self.db_name = db_name # Create token from username and password token = f"{self.username}:{self.password}" if self.username and self.password else "" # Get TLS cert path if not already done if not self.server_pem_path: self.server_pem_path = os.environ.get("MILVUS_TLS_CERT_PATH", "/opt/secret/milvus/tls.crt") if not os.path.exists(self.server_pem_path): logger.warning(f"TLS cert not found at {self.server_pem_path}, proceeding without TLS") self.server_pem_path = "" else: logger.info(f"Using TLS cert: {self.server_pem_path}") logger.info(f"Connecting to Milvus at {self.milvus_uri}, database: {self.db_name}") # Create client using token (not separate username/password) if self.server_pem_path: self.milvus_client = MilvusClient( uri=self.milvus_uri, token=token, db_name=self.db_name, server_pem_path=self.server_pem_path, ) else: self.milvus_client = MilvusClient( uri=self.milvus_uri, token=token, db_name=self.db_name, ) logger.info(f"✓ Connected successfully to Milvus database: {self.db_name}") except ImportError: raise ImportError("pymilvus not installed. Install with: pip install pymilvus") except Exception as e: logger.error(f"Failed to initialize Milvus client: {e}") raise def list_databases(self) -> List[str]: """List all databases in Milvus.""" if not self.milvus_client: self._init_milvus_client() try: # List databases using MilvusClient databases = self.milvus_client.list_databases() logger.info(f"Found {len(databases)} databases: {databases}") return databases except Exception as e: logger.warning(f"Could not list databases: {e}. Using default database only.") # Fall back to just the current database return [self.db_name or 'default'] def get_collection_stats(self, collection_name: str, include_tenant_info: bool = True) -> Dict: """Get detailed statistics for a single collection with optional tenant/KB info.""" if not self.milvus_client: result = { 'collection_name': collection_name, 'exists': False, 'row_count': 0, 'error': 'Milvus client not initialized' } else: try: # Check if collection exists if not self.milvus_client.has_collection(collection_name): result = { 'collection_name': collection_name, 'exists': False, 'row_count': 0, 'error': 'Collection does not exist' } else: # Get collection stats stats = self.milvus_client.get_collection_stats(collection_name) row_count = int(stats.get('row_count', 0)) result = { 'collection_name': collection_name, 'exists': True, 'row_count': row_count, 'raw_stats': stats } except Exception as e: logger.error(f"Error getting stats for collection {collection_name}: {e}") result = { 'collection_name': collection_name, 'exists': False, 'row_count': 0, 'error': str(e) } # Add tenant/KB info if available and requested if include_tenant_info and collection_name in self.tenant_kb_map: tenant_info = self.tenant_kb_map[collection_name] result.update({ 'tenant_id': tenant_info.get('tenant_id'), 'tenant_name': tenant_info.get('tenant_name'), 'kb_id': tenant_info.get('kb_id'), 'kb_name': tenant_info.get('kb_name'), 'kb_display_name': tenant_info.get('kb_display_name'), 'kb_created_on': tenant_info.get('kb_created_on') }) elif include_tenant_info: # Collection exists in Milvus but not in PostgreSQL result.update({ 'tenant_id': None, 'tenant_name': None, 'kb_id': None, 'kb_name': None, 'kb_display_name': None, 'kb_created_on': None }) return result def get_database_stats(self, db_name: str, detailed: bool = False, include_tenant_info: bool = True) -> Dict: """Get statistics for a single database with optional tenant/KB info.""" # Connect to this specific database self._init_milvus_client(db_name) try: # List all collections in this database collection_names = self.milvus_client.list_collections() logger.info(f"Database '{db_name}': Found {len(collection_names)} collections") # Gather stats for each collection collection_stats = [] total_rows = 0 collections_with_data = 0 for collection_name in collection_names: logger.info(f" Querying collection: {collection_name}") stats = self.get_collection_stats(collection_name, include_tenant_info) if stats.get('exists'): row_count = stats.get('row_count', 0) total_rows += row_count if row_count > 0: collections_with_data += 1 # Add to results if detailed: collection_stats.append(stats) else: # Simplified stats without raw_stats simplified = { 'collection_name': stats['collection_name'], 'exists': stats['exists'], 'row_count': stats['row_count'], 'error': stats.get('error', '') } # Include tenant info if requested if include_tenant_info: simplified.update({ 'tenant_id': stats.get('tenant_id'), 'tenant_name': stats.get('tenant_name'), 'kb_id': stats.get('kb_id'), 'kb_name': stats.get('kb_name'), 'kb_display_name': stats.get('kb_display_name'), 'kb_created_on': stats.get('kb_created_on') }) collection_stats.append(simplified) return { 'database': db_name, 'total_collections': len(collection_names), 'collections_with_data': collections_with_data, 'empty_collections': len(collection_names) - collections_with_data, 'total_rows': total_rows, 'collections': collection_stats } except Exception as e: logger.error(f"Error gathering stats for database {db_name}: {e}") return { 'database': db_name, 'total_collections': 0, 'collections_with_data': 0, 'empty_collections': 0, 'total_rows': 0, 'collections': [], 'error': str(e) } def get_all_usage_stats(self, detailed: bool = False, include_tenant_info: bool = True) -> Dict: """Get comprehensive Milvus usage statistics across all databases with optional tenant/KB info.""" if not self.milvus_client: self._init_milvus_client() # Query tenant/KB info from PostgreSQL if requested if include_tenant_info: logger.info("\n--- Querying PostgreSQL for tenant/KB information ---") if self._init_postgres_connection(): self.tenant_kb_map = self.query_tenant_kb_info() logger.info(f"Loaded {len(self.tenant_kb_map)} tenant/KB mappings") else: logger.warning("Could not connect to PostgreSQL, proceeding without tenant info") try: # List all databases databases = self.list_databases() logger.info(f"\nQuerying {len(databases)} database(s)...") # Gather stats for each database database_stats = [] total_collections = 0 total_collections_with_data = 0 total_rows = 0 for db_name in databases: logger.info(f"\n--- Processing database: {db_name} ---") db_stats = self.get_database_stats(db_name, detailed, include_tenant_info) database_stats.append(db_stats) total_collections += db_stats['total_collections'] total_collections_with_data += db_stats['collections_with_data'] total_rows += db_stats['total_rows'] # Build summary summary = { 'milvus_uri': self.milvus_uri, 'total_databases': len(databases), 'total_collections': total_collections, 'collections_with_data': total_collections_with_data, 'empty_collections': total_collections - total_collections_with_data, 'total_rows': total_rows, 'databases': database_stats } if include_tenant_info: summary['tenant_kb_mappings_loaded'] = len(self.tenant_kb_map) logger.info(f"\n✓ Statistics gathered successfully") logger.info(f" Total databases: {len(databases)}") logger.info(f" Total collections: {total_collections}") logger.info(f" Collections with data: {total_collections_with_data}") logger.info(f" Empty collections: {total_collections - total_collections_with_data}") logger.info(f" Total rows: {total_rows:,}") if include_tenant_info: logger.info(f" Tenant/KB mappings: {len(self.tenant_kb_map)}") return summary except Exception as e: logger.error(f"Error gathering usage stats: {e}") raise def format_as_json(self, stats: Dict) -> str: """Format statistics as JSON.""" return json.dumps(stats, indent=2) def format_as_csv(self, stats: Dict) -> str: """Format statistics as CSV.""" output = StringIO() # Write summary header writer = csv.writer(output) writer.writerow(['Metric', 'Value']) writer.writerow(['Milvus URI', stats['milvus_uri']]) writer.writerow(['Total Databases', stats['total_databases']]) writer.writerow(['Total Collections', stats['total_collections']]) writer.writerow(['Collections with Data', stats['collections_with_data']]) writer.writerow(['Empty Collections', stats['empty_collections']]) writer.writerow(['Total Rows', stats['total_rows']]) if 'tenant_kb_mappings_loaded' in stats: writer.writerow(['Tenant/KB Mappings', stats['tenant_kb_mappings_loaded']]) writer.writerow([]) # Write collection details per database # Check if tenant info is included has_tenant_info = False if stats['databases']: first_db = stats['databases'][0] if first_db.get('collections'): first_collection = first_db['collections'][0] has_tenant_info = 'tenant_id' in first_collection if has_tenant_info: writer.writerow([ 'Database', 'Collection Name', 'Exists', 'Row Count', 'Tenant ID', 'Tenant Name', 'KB ID', 'KB Name', 'KB Display Name', 'KB Created On', 'Error' ]) for db_stats in stats['databases']: db_name = db_stats['database'] for collection in db_stats['collections']: writer.writerow([ db_name, collection['collection_name'], collection['exists'], collection['row_count'], collection.get('tenant_id', ''), collection.get('tenant_name', ''), collection.get('kb_id', ''), collection.get('kb_name', ''), collection.get('kb_display_name', ''), collection.get('kb_created_on', ''), collection.get('error', '') ]) else: writer.writerow(['Database', 'Collection Name', 'Exists', 'Row Count', 'Error']) for db_stats in stats['databases']: db_name = db_stats['database'] for collection in db_stats['collections']: writer.writerow([ db_name, collection['collection_name'], collection['exists'], collection['row_count'], collection.get('error', '') ]) return output.getvalue() def format_as_table(self, stats: Dict) -> str: """Format statistics as a text table.""" output = [] output.append("=" * 120) output.append("MILVUS USAGE STATISTICS") output.append("=" * 120) output.append("") output.append(f"Milvus URI: {stats['milvus_uri']}") output.append(f"Total Databases: {stats['total_databases']}") output.append(f"Total Collections: {stats['total_collections']}") output.append(f"Collections with Data: {stats['collections_with_data']}") output.append(f"Empty Collections: {stats['empty_collections']}") output.append(f"Total Rows: {stats['total_rows']:,}") if 'tenant_kb_mappings_loaded' in stats: output.append(f"Tenant/KB Mappings: {stats['tenant_kb_mappings_loaded']}") output.append("") # Process each database for db_stats in stats['databases']: output.append("=" * 120) output.append(f"DATABASE: {db_stats['database']}") output.append("=" * 120) output.append("") output.append(f"Total Collections: {db_stats['total_collections']}") output.append(f"Collections with Data: {db_stats['collections_with_data']}") output.append(f"Empty Collections: {db_stats['empty_collections']}") output.append(f"Total Rows: {db_stats['total_rows']:,}") output.append("") if db_stats.get('error'): output.append(f"Error: {db_stats['error']}") output.append("") continue output.append("-" * 120) output.append("COLLECTIONS") output.append("-" * 120) output.append("") # Sort collections by row count (descending) collections = sorted( db_stats['collections'], key=lambda x: x.get('row_count', 0), reverse=True ) # Check if tenant info is included has_tenant_info = collections and 'tenant_id' in collections[0] for collection in collections: status = "✓" if collection['exists'] else "✗" name = collection['collection_name'] row_count = collection.get('row_count', 0) error = collection.get('error', '') output.append(f"{status} {name}") output.append(f" Row Count: {row_count:,}") if has_tenant_info: tenant_name = collection.get('tenant_name', 'N/A') kb_name = collection.get('kb_name', 'N/A') kb_display_name = collection.get('kb_display_name', 'N/A') kb_created_on = collection.get('kb_created_on', 'N/A') output.append(f" Tenant: {tenant_name}") output.append(f" KB Name: {kb_name}") if kb_display_name and kb_display_name != 'N/A': output.append(f" KB Display Name: {kb_display_name}") if kb_created_on and kb_created_on != 'N/A': output.append(f" KB Created: {kb_created_on}") if error: output.append(f" Error: {error}") output.append("") output.append("=" * 120) return "\n".join(output) def close(self): """Close Milvus and PostgreSQL connections.""" if self.milvus_client: self.milvus_client.close() logger.info("Milvus client connection closed") if self.pg_conn: self.pg_conn.close() logger.info("PostgreSQL connection closed") def main(): parser = argparse.ArgumentParser( description='Get Milvus usage statistics', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument( '--output', type=str, help='Output file path (default: stdout)' ) parser.add_argument( '--output-format', choices=['json', 'csv', 'table'], default='table', help='Output format (default: table)' ) parser.add_argument( '--detailed', action='store_true', help='Include detailed collection statistics (raw stats from Milvus)' ) parser.add_argument( '--skip-tenant-info', action='store_true', help='Skip querying tenant and knowledge-base information from PostgreSQL' ) args = parser.parse_args() stats_collector = None try: stats_collector = MilvusUsageStats() # Gather statistics (include tenant info by default unless --skip-tenant-info is specified) include_tenant_info = not args.skip_tenant_info stats = stats_collector.get_all_usage_stats( detailed=args.detailed, include_tenant_info=include_tenant_info ) # Format output if args.output_format == 'json': output = stats_collector.format_as_json(stats) elif args.output_format == 'csv': output = stats_collector.format_as_csv(stats) else: # table output = stats_collector.format_as_table(stats) # Write output if args.output: with open(args.output, 'w') as f: f.write(output) logger.info(f"Results written to {args.output}") else: print(output) except Exception as e: logger.error(f"Script failed: {e}", exc_info=True) sys.exit(1) finally: if stats_collector: stats_collector.close() if __name__ == '__main__': main()