Milvus

Milvus est une base de données vectorielle open source conçue pour stocker et rechercher efficacement des données vectorielles dynamiques à grande échelle. Il a été développé à l'aide de Facebook Faiss, une bibliothèque open source d' C++ s destinée à la recherche de similitudes vectorielles. L'utilisation d' Milvus offre un environnement permettant de créer, de gérer et d'interroger efficacement des données vectorielles, facilitant ainsi le développement d'applications intelligentes.

Milvus Observability with Instana

En utilisant OpenTelemetry avec Instana, vous pouvez collecter des traces pour les opérations sur la base de données Milvus, telles que create, insert, upsert et delete.

Milvus configuration

Avant de commencer, assurez-vous que votre environnement répond à toutes les conditions préalables. Pour plus d'informations, consultez la section « Conditions préalables ».

Il existe plusieurs façons de se connecter localement à Milvus. La méthode suivante est l'une des méthodes permettant de se connecter à l'aide d' Docker

Pour commencer à utiliser Milvus dans votre projet, vous devez installer et configurer Milvus. Vous pouvez également l'installer sur votre ordinateur en utilisant Docker.

  1. Installer l' Docker : assurez-vous d'avoir installé l' Docker sur votre système. Vous pouvez le télécharger depuis le site officiel d' Docker.

  2. Installation d' Milvus : le site Milvus met à disposition un fichier de configuration « compose » pour Docker dans le référentiel Milvus. Pour installer Milvus à l'aide de la commande ` Docker `, procédez comme suit :

    • Téléchargez le fichier de configuration « docker » en exécutant la commande suivante :

      wget https://github.com/milvus-io/milvus/releases/download/v2.5.5/milvus-standalone-docker-compose.yml -O docker-compose.yml
       
    • Lancez « Milvus » en exécutant la commande suivante :

      sudo docker compose up -d
       
    • Vous obtiendrez le résultat suivant :

      Creating milvus-etcd  ... done
      Creating milvus-minio ... done
      Creating milvus-standalone ... done
       

Une fois que vous aurez lancé Milvus, les conteneurs suivants seront opérationnels : milvus-standalone, milvus-minio, et milvus-etcd.

Vous pouvez vérifier si les conteneurs sont opérationnels à l'aide de la commande suivante :

sudo docker-compose ps
 

Vous obtiendrez le résultat suivant :


      Name                     Command                  State                            Ports
--------------------------------------------------------------------------------------------------------------------
milvus-etcd         etcd -advertise-client-url ...   Up             2379/tcp, 2380/tcp
milvus-minio        /usr/bin/docker-entrypoint ...   Up (healthy)   9000/tcp
milvus-standalone   /tini -- milvus run standalone   Up             0.0.0.0:19530->19530/tcp, 0.0.0.0:9091->9091/tcp

 

Une fois le conteneur démarré, vérifiez que votre serveur Milvus est bien en cours d'exécution en vous rendant sur http://localhost:19530.

Pour installer PyMilvus,, exécutez la commande suivante :

 pip install pymilvus
 

Pour installer les dépendances d' IBM watsonx, exécutez la commande suivante :

pip install ibm-watsonx-ai==1.1.20 langchain-ibm==0.3.1
 

Dans votre application LLM, initialisez le traceur Traceloop en exécutant la commande suivante :

from traceloop.sdk import Traceloop
Traceloop.init()
 

L'exemple d'application suivant montre comment se connecter à Milvus, insérer des données dans la base de données et effectuer des opérations de création, lecture, mise à jour et suppression (CRUD), notamment l'insertion, la suppression, l'upsert, la recherche, la récupération et l'interrogation.

Vous pouvez utiliser le code suivant pour générer une application d'exemple nommée WatsonxEmbeddingMilvus.py:

from pymilvus import MilvusClient

from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task

from langchain_ibm.embeddings import WatsonxEmbeddings 
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames
import os

Traceloop.init(app_name="Watsonx_Embeddings_MilvusClient")

# connect to Milvus Locally
@task(name="setup_milvus_client")
def setup_milvus_client(uri: str, collection_name: str, dimension: int):
    client = MilvusClient(uri=uri)
    ##create a collection in Milvus DB
    if client.has_collection(collection_name=collection_name):
        client.drop_collection(collection_name=collection_name)
    client.create_collection(
        collection_name=collection_name, dimension=dimension, timeout=10, metric_type="COSINE"
    )
    return client


embedding_model = None  # Define Embedding Model globally

#Initialize watsonx embedding model 
@task(name="initialize_embedding_model")
def initialize_embedding_model(
    ibm_cloud_url: str,
    ibm_cloud_api_key: str,
    model_id: str,
    project_id: str,
    model_kwargs: dict = None,
    encode_kwargs: dict = None,
):
    embed_params = {
    EmbedTextParamsMetaNames.TRUNCATE_INPUT_TOKENS: 3,
    EmbedTextParamsMetaNames.RETURN_OPTIONS: {"input_text": True},
    }
    global embedding_model
    model_kwargs = model_kwargs or {}
    encode_kwargs = encode_kwargs or {"normalize_embeddings": False}

    embedding_model = WatsonxEmbeddings(
        url=ibm_cloud_url,
        project_id=project_id,
        model_id=model_id,
        apikey=ibm_cloud_api_key,
        params=embed_params
    )


# embed Documents and insert it to Milvus DB
@task(name="encode_documents_and_insert")
def encode_documents_and_insert(
    client: MilvusClient,
    collection_name: str,
    partition_name: str,
    docs: list,
    subject: str,
    timeout: float,
):
    vectors = embedding_model.embed_documents(docs)
    data = [
        {"id": i, "vector": vectors[i], "text": docs[i], "subject": subject}
        for i in range(len(vectors))
    ]

    res = client.insert(
        collection_name=collection_name,
        partition_name=partition_name,
        data=data,
        timeout=timeout,
    )
    print(res)


# apply vector embedding on the query and search the same in the vecotr db
@task(name="perform_vector_search")
def perform_vector_search(
    client: MilvusClient,
    collection_name: str,
    query: str,
    limit: int,
    output_fields: list,
):
    query_vector = embedding_model.embed_query(query)
    result = client.search(
        collection_name=collection_name,
        partition_name="partitionA",
        data=[query_vector],
        limit=limit,
        output_fields=output_fields,
    )
    return result


# search in vecotr db with filters applied
@task(name="perform_vector_search_with_filter")
def perform_vector_search_with_filter(
    client: MilvusClient,
    collection_name: str,
    partition_names: list,
    anns_field: str,
    search_params: dict,
    query: str,
    filter: str,
    limit: int,
    output_fields: list,
    timeout: float,
):
    query_vector = embedding_model.embed_query(query)
    searchResult = client.search(
        collection_name=collection_name,
        partition_names=partition_names,
        search_params=search_params,
        anns_field=anns_field,
        data=[query_vector],
        filter=filter,
        limit=limit,
        output_fields=output_fields,
        timeout=timeout,
    )
    return searchResult


# query for entries in the Collection 
@task(name="perform_query")
def perform_query(
    client: MilvusClient,
    collection_name: str,
    filter: str,
    output_fields: list,
):
    queryResult = client.query(
        collection_name=collection_name,
        filter=filter,
        partition_names=["partitionA"],
        output_fields=output_fields,
    )
    return queryResult


# query the db passing list of ids
@task(name="perform_query_ids")
def perform_query_Ids_partition(
    client: MilvusClient,
    collection_name: str,
    partition_names: list,
    limit: int,
    ids: list,
    output_fields: list,
    timeout: float,
):
    queryResult = client.query(
        collection_name=collection_name,
        partition_names=partition_names,
        limit=limit,
        ids=ids,
        timeout=timeout,
    )
    return queryResult

# delete entries from the collection
@task(name="delete_entities")
def delete_entities(
    client: MilvusClient,
    collection_name: str,
    partition_name: str,
    ids: list = None,
    filter: str = None,
    timeout: float = None,
):
    if ids is not None:
        deleteResult = client.delete(collection_name=collection_name, ids=ids)
        print(deleteResult)
    if filter is not None:
        deleteRes = client.delete(
            collection_name=collection_name,
            timeout=timeout,
            filter=filter,
            partition_name=partition_name,
        )
        print(deleteRes)

# modify data in the collection
@task(name="upsert_entities")
def upsert_entities( 
    client: MilvusClient,
    collection_name: str,
    partition_name: str,
    docs: list,
    ids: list,
    subject: str,
    timeout: float,
):
    vectors = embedding_model.embed_documents(docs)
    data = [
        {"id": ids[i], "vector": vectors[i], "text": docs[i], "subject": subject}
        for i in range(len(vectors))
    ]

    res = client.upsert( 
        collection_name=collection_name,
        partition_name=partition_name,
        data=data,
        timeout=timeout,
    )
    print("Upsert Result:", res)


@task(name="get_entities")
def get_entities(
    client: MilvusClient,
    collection_name: str,
    partition_names: list,
    output_fields: list,
    ids: list,
    timeout: float,
):
    result = client.get(
        collection_name=collection_name,
        partition_names=partition_names,
        output_fields=output_fields,
        ids=ids,
        timeout=timeout,
    )
    return result

@workflow(name="milvus_operations_with_watsonx")  
def milvus_operations_with_watsonx():
    client = setup_milvus_client(
        uri="http://127.0.0.1:19530", collection_name="demo_collection", dimension=768
    )
    partition_name = "partitionA"
    client.create_partition(
        collection_name="demo_collection", partition_name=partition_name
    )

    #  Watsonx Embedding model parameters
    ibm_cloud_url = os.getenv("WATSONX_URL")
    ibm_cloud_api_key = os.getenv("WATSONX_API_KEY")
    model_id = (
        "ibm/slate-125m-english-rtrvr"  # or any other supported model
    )
    project_id=os.getenv("WATSONX_PROJECT_ID")

    initialize_embedding_model(
        ibm_cloud_url=ibm_cloud_url,
        ibm_cloud_api_key=ibm_cloud_api_key,
        model_id=model_id,
        project_id=project_id
    )

    docs_history = [
        "Artificial intelligence was founded as an academic discipline in 1956.",
        "Alan Turing was the first person to conduct substantial research in AI.",
        "Born in Maida Vale, London, Turing was raised in southern England.",
    ]

    encode_documents_and_insert(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        docs=docs_history,
        subject="history",
        timeout=10,
    )  

    # Upsert example
    new_docs_history = [
        "Alan Turing developed the Turing Test.",
        "Artificial intelligence continues to evolve.",
    ]
    new_ids_history = [
        0,
        1,
    ]  
    upsert_entities(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        docs=new_docs_history,
        ids=new_ids_history,
        subject="history",
        timeout=10,
    )

    # Get example
    get_result = get_entities(
        client=client,
        collection_name="demo_collection",
        partition_names=[partition_name],
        output_fields=["text", "subject"],
        ids=new_ids_history,
        timeout=10,
    )
    print("Get Result:", get_result)

    # Semantic Search
    # Vector search
    result = perform_vector_search(
        client=client,
        collection_name="demo_collection",
        query="Who is Alan Turing?",
        limit=2,
        output_fields=["text", "subject"],
    )
    print(result)

    # Vector Search with Metadata Filtering
    docs_biology = [
        "Machine learning has been used for drug design.",
        "Computational synthesis with AI algorithms predicts molecular properties.",
        "DDR1 is involved in cancers and fibrosis.",
    ]

    encode_documents_and_insert(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        docs=docs_biology,
        subject="biology",
        timeout=10,
    )

    search_params = {"metric_type": "COSINE", "params": {}}

    searchResult = perform_vector_search_with_filter(
        client=client,
        collection_name="demo_collection",
        partition_names=[partition_name],
        anns_field="vector",
        search_params=search_params,
        query="tell me AI related information",
        filter="subject == 'biology'",
        limit=2,
        output_fields=["text", "subject"],
        timeout=10,
    )
    print(searchResult)

    # Perform Query
    queryResult = perform_query(
        client=client,
        collection_name="demo_collection",
        filter="subject == 'history'",
        output_fields=["text", "subject"],
    )
    print(queryResult)

    # Perform Query with ids as input param
    queryResult = perform_query_Ids_partition(
        client=client,
        collection_name="demo_collection",
        partition_names=[partition_name],
        limit=1,
        ids=[0, 2],
        output_fields=["text", "subject"],
        timeout=10,
    )
    print(queryResult)

    # Delete entities
    delete_entities(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        ids=[0, 2],
        timeout=10,
    )

    # 8. Delete entities by a filter expression
    delete_entities(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        filter="subject == 'biology'",
        timeout=10,
    )


milvus_operations_with_watsonx()
 

Pour accéder à IBM watsonx, vous devez disposer des identifiants suivants :

export WATSONX_URL=<watsonx-url>
export WATSONX_API_KEY=<watsonx-iam-api-key>
export WATSONX_PROJECT_ID=<watsonx-project-id>
 

Lancez l'application d'exemple pour vérifier l'installation et la configuration.

python WatsonxEmbeddingMilvus.py
 

Pour exporter des traces et des métriques vers Instana, utilisez la configuration d'exportation suivante :

Envoi des données de traces, de métriques et de journaux d' OpenTelemetry s à l'agent Instana

Mode de l'agent

Configurez votre application pour qu'elle envoie des données au point de terminaison de l'agent « Instana » :

export OTEL_RESOURCE_ATTRIBUTES="INSTANA_PLUGIN=genai"
export TRACELOOP_BASE_URL=<instana-agent-host>:4317
export TRACELOOP_LOGGING_ENABLED=true
export TRACELOOP_METRICS_ENABLED=true
export OTEL_EXPORTER_OTLP_INSECURE=true
 

Pour plus d'informations sur les ports, consultez la section « Envoi de données d' OpenTelemetry s à l'agent d' Instana ».

Mode sans agent

Configurez votre application pour qu'elle envoie les données directement au serveur d' Instana :

export OTEL_RESOURCE_ATTRIBUTES="INSTANA_PLUGIN=genai"

export TRACELOOP_BASE_URL=<instana-otlp-endpoint>:4317
export TRACELOOP_HEADERS="x-instana-key=<agent-key>,x-instana-host=<instana-host>"
export TRACELOOP_LOGGING_ENABLED=true
export TRACELOOP_METRICS_ENABLED=true
export OTEL_EXPORTER_OTLP_INSECURE=false
 

Pour plus d'informations, consultez la section « Envoi de données d' OpenTelemetry s vers le backend Instana ».

Affichage des traces

Pour créer une perspective d'application permettant de consulter les informations de trace recueillies lors de l'exécution de l'application LLM, procédez comme suit :

  1. Dans l'interface utilisateur d' Instana, ouvrez l'assistant « New Application Perspective » de l'une des manières suivantes :
    • Dans le tableau de bord d' Instana, dans la section Applications, cliquez sur Ajouter une application.
    • Dans le menu de navigation, cliquez sur Applications > Ajouter, puis sélectionnez Nouvelle perspective d'application.
  2. Sélectionnez « Services » ou « Points de terminaison », puis cliquez sur « Suivant ».
  3. Cliquez sur « Ajouter un filtre » et sélectionnez le nom d'un service. Vous pouvez sélectionner plusieurs services et points de terminaison à l'aide de OR conditions. Le nom du service est spécifié par le app_name paramètre dans Traceloop.init(). Par exemple, Watsonx_Embeddings_MilvusClient.
  4. Dans le champ « Nom de la perspective d'application », saisissez un nom pour la perspective d'application LLM. Cliquez ensuite sur Créer.

La nouvelle perspective d'application est créée.

Pour consulter les informations de suivi, accédez au menu de navigation de l'interface utilisateur d' Instana, puis cliquez sur « Analytics ». Dans le tableau de bord Analytics, vous pouvez utiliser les applications, les services et les points de terminaison pour analyser les appels. Instana présente les données en utilisant les noms des services, des points de terminaison et des appels. Vous pouvez filtrer et regrouper les traces ou les appels à l'aide de balises de votre choix, par exemple en les filtrant par 'Trace->Service Name' equals Watsonx_Embeddings_MilvusClient. Pour plus d'informations, consultez la section « Analyse des traces et des appels ».

Les traces collectées à partir du code précédent s'affichent dans l'interface utilisateur d' Instana.

Figure 1. Milvus obtenir des traces
Milvus obtenir des traces
Figure 2. Milvus insérer des pistes
Milvus insérer des pistes
Figure 3 Milvus traces de requêtes
Milvus traces de requêtes
Figure 4 Milvus traces de recherche
Milvus traces de recherche

Affichage des mesures

Pour consulter les indicateurs recueillis à partir des opérations de votre base de données Milvus, procédez comme suit :

  1. Dans le menu de navigation de l'interface utilisateur d' Instana, sélectionnez « Infrastructure ».
  2. Cliquez sur « Analyser l'infrastructure ».
  3. Dans la liste des types d'entités, sélectionnez « OTel Milvus DB ».
  4. Cliquez sur l'instance de l'entité « OTel » du type d'entité de base de données « Milvus ». Le tableau de bord correspondant s'affiche.

Le tableau de bord des indicateurs fournit des informations sur les performances et l'état de santé de votre base de données Milvus, notamment :

  • Indicateurs d'opérations : consultez le nombre d'opérations effectuées (insertions, mises à jour avec insertion et suppressions) au fil du temps.
  • Distance de recherche : distance entre le vecteur de requête et les vecteurs correspondants.
  • Indicateurs de latence : surveillez le temps de réponse des opérations de requête.
Figure 5. Milvus tableau de bord des indicateurs
Milvus tableau de bord des indicateurs