Milvus

Milvus ist eine Open-Source-Vektordatenbank, die für die effiziente Speicherung und Suche in umfangreichen, dynamischen Vektordaten konzipiert ist. Es basiert auf Facebook Faiss, einer Open-Source- C++ -Bibliothek für die Vektorähnlichkeitssuche. Die Nutzung von „ Milvus “ bietet eine Umgebung, in der Sie Vektordaten effizient erstellen, verwalten und abfragen können, was die Entwicklung intelligenter Anwendungen erleichtert.

Milvus Observability with Instana

Mit „ OpenTelemetry “ in Verbindung mit „ Instana “ können Sie Traces für Datenbankoperationen in „ Milvus “ erfassen, beispielsweise für die Befehle „create“, „insert“, „upsert“ und „delete“.

Milvus Einrichtung

Bevor Sie beginnen, stellen Sie sicher, dass Ihre Umgebung alle Voraussetzungen erfüllt. Weitere Informationen finden Sie unter „Voraussetzungen “.

Es gibt viele Möglichkeiten, lokal eine Verbindung zu Milvus herzustellen. Die folgende Methode ist eine der Möglichkeiten, eine Verbindung über Docker herzustellen.

Um „ Milvus “ in Ihrem Projekt zu verwenden, müssen Sie „ Milvus “ installieren und einrichten. Sie können das Programm auch auf Ihrem lokalen Rechner installieren, indem Sie Docker verwenden.

  1. Docker installieren : Stellen Sie sicher, dass Sie Docker auf Ihrem System installiert haben. Sie können es von der offiziellen Website von „ Docker “ herunterladen.

  2. Milvus installieren : Unter Milvus steht im Repository „ Milvus “ eine Konfigurationsdatei für „ Docker “ zur Verfügung. Um „ Milvus “ mithilfe von „ Docker Compose“ zu installieren, gehen Sie wie folgt vor:

    • Laden Sie die Konfigurationsdatei „ docker “ herunter, indem Sie den folgenden Befehl ausführen:

      wget https://github.com/milvus-io/milvus/releases/download/v2.5.5/milvus-standalone-docker-compose.yml -O docker-compose.yml
       
    • Starten Sie „ Milvus “, indem Sie den folgenden Befehl ausführen:

      sudo docker compose up -d
       
    • Sie erhalten folgende Ausgabe:

      Creating milvus-etcd  ... done
      Creating milvus-minio ... done
      Creating milvus-standalone ... done
       

Nachdem Sie „ Milvus “ gestartet haben, sind die folgenden Container in Betrieb: milvus-standalone, milvus-minio, und milvus-etcd.

Mit dem folgenden Befehl können Sie überprüfen, ob die Container laufen:

sudo docker-compose ps
 

Sie erhalten folgende Ausgabe:


      Name                     Command                  State                            Ports
--------------------------------------------------------------------------------------------------------------------
milvus-etcd         etcd -advertise-client-url ...   Up             2379/tcp, 2380/tcp
milvus-minio        /usr/bin/docker-entrypoint ...   Up (healthy)   9000/tcp
milvus-standalone   /tini -- milvus run standalone   Up             0.0.0.0:19530->19530/tcp, 0.0.0.0:9091->9091/tcp

 

Nachdem Sie den Container gestartet haben, überprüfen Sie, ob Ihr „ Milvus “-Server läuft, indem Sie auf die Seite zugreifen http://localhost:19530.

Um „ PyMilvus, “ zu installieren, führen Sie den folgenden Befehl aus:

 pip install pymilvus
 

Um die Abhängigkeiten für IBM watsonx zu installieren, führen Sie den folgenden Befehl aus:

pip install ibm-watsonx-ai==1.1.20 langchain-ibm==0.3.1
 

Initialisieren Sie in Ihrer LLM-Anwendung den Traceloop-Tracer, indem Sie den folgenden Befehl ausführen:

from traceloop.sdk import Traceloop
Traceloop.init()
 

Die folgende Beispielanwendung zeigt, wie man eine Verbindung zu Milvus herstellt, Daten in die Datenbank einfügt und CRUD-Operationen (Create, Read, Update, Delete) durchführt, darunter Einfügen, Löschen, Upsert, Suchen, Abrufen und Abfragen.

Mit dem folgenden Code können Sie eine Beispielanwendung mit dem Namen WatsonxEmbeddingMilvus.py: erstellen

from pymilvus import MilvusClient

from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task

from langchain_ibm.embeddings import WatsonxEmbeddings 
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames
import os

Traceloop.init(app_name="Watsonx_Embeddings_MilvusClient")

# connect to Milvus Locally
@task(name="setup_milvus_client")
def setup_milvus_client(uri: str, collection_name: str, dimension: int):
    client = MilvusClient(uri=uri)
    ##create a collection in Milvus DB
    if client.has_collection(collection_name=collection_name):
        client.drop_collection(collection_name=collection_name)
    client.create_collection(
        collection_name=collection_name, dimension=dimension, timeout=10, metric_type="COSINE"
    )
    return client


embedding_model = None  # Define Embedding Model globally

#Initialize watsonx embedding model 
@task(name="initialize_embedding_model")
def initialize_embedding_model(
    ibm_cloud_url: str,
    ibm_cloud_api_key: str,
    model_id: str,
    project_id: str,
    model_kwargs: dict = None,
    encode_kwargs: dict = None,
):
    embed_params = {
    EmbedTextParamsMetaNames.TRUNCATE_INPUT_TOKENS: 3,
    EmbedTextParamsMetaNames.RETURN_OPTIONS: {"input_text": True},
    }
    global embedding_model
    model_kwargs = model_kwargs or {}
    encode_kwargs = encode_kwargs or {"normalize_embeddings": False}

    embedding_model = WatsonxEmbeddings(
        url=ibm_cloud_url,
        project_id=project_id,
        model_id=model_id,
        apikey=ibm_cloud_api_key,
        params=embed_params
    )


# embed Documents and insert it to Milvus DB
@task(name="encode_documents_and_insert")
def encode_documents_and_insert(
    client: MilvusClient,
    collection_name: str,
    partition_name: str,
    docs: list,
    subject: str,
    timeout: float,
):
    vectors = embedding_model.embed_documents(docs)
    data = [
        {"id": i, "vector": vectors[i], "text": docs[i], "subject": subject}
        for i in range(len(vectors))
    ]

    res = client.insert(
        collection_name=collection_name,
        partition_name=partition_name,
        data=data,
        timeout=timeout,
    )
    print(res)


# apply vector embedding on the query and search the same in the vecotr db
@task(name="perform_vector_search")
def perform_vector_search(
    client: MilvusClient,
    collection_name: str,
    query: str,
    limit: int,
    output_fields: list,
):
    query_vector = embedding_model.embed_query(query)
    result = client.search(
        collection_name=collection_name,
        partition_name="partitionA",
        data=[query_vector],
        limit=limit,
        output_fields=output_fields,
    )
    return result


# search in vecotr db with filters applied
@task(name="perform_vector_search_with_filter")
def perform_vector_search_with_filter(
    client: MilvusClient,
    collection_name: str,
    partition_names: list,
    anns_field: str,
    search_params: dict,
    query: str,
    filter: str,
    limit: int,
    output_fields: list,
    timeout: float,
):
    query_vector = embedding_model.embed_query(query)
    searchResult = client.search(
        collection_name=collection_name,
        partition_names=partition_names,
        search_params=search_params,
        anns_field=anns_field,
        data=[query_vector],
        filter=filter,
        limit=limit,
        output_fields=output_fields,
        timeout=timeout,
    )
    return searchResult


# query for entries in the Collection 
@task(name="perform_query")
def perform_query(
    client: MilvusClient,
    collection_name: str,
    filter: str,
    output_fields: list,
):
    queryResult = client.query(
        collection_name=collection_name,
        filter=filter,
        partition_names=["partitionA"],
        output_fields=output_fields,
    )
    return queryResult


# query the db passing list of ids
@task(name="perform_query_ids")
def perform_query_Ids_partition(
    client: MilvusClient,
    collection_name: str,
    partition_names: list,
    limit: int,
    ids: list,
    output_fields: list,
    timeout: float,
):
    queryResult = client.query(
        collection_name=collection_name,
        partition_names=partition_names,
        limit=limit,
        ids=ids,
        timeout=timeout,
    )
    return queryResult

# delete entries from the collection
@task(name="delete_entities")
def delete_entities(
    client: MilvusClient,
    collection_name: str,
    partition_name: str,
    ids: list = None,
    filter: str = None,
    timeout: float = None,
):
    if ids is not None:
        deleteResult = client.delete(collection_name=collection_name, ids=ids)
        print(deleteResult)
    if filter is not None:
        deleteRes = client.delete(
            collection_name=collection_name,
            timeout=timeout,
            filter=filter,
            partition_name=partition_name,
        )
        print(deleteRes)

# modify data in the collection
@task(name="upsert_entities")
def upsert_entities( 
    client: MilvusClient,
    collection_name: str,
    partition_name: str,
    docs: list,
    ids: list,
    subject: str,
    timeout: float,
):
    vectors = embedding_model.embed_documents(docs)
    data = [
        {"id": ids[i], "vector": vectors[i], "text": docs[i], "subject": subject}
        for i in range(len(vectors))
    ]

    res = client.upsert( 
        collection_name=collection_name,
        partition_name=partition_name,
        data=data,
        timeout=timeout,
    )
    print("Upsert Result:", res)


@task(name="get_entities")
def get_entities(
    client: MilvusClient,
    collection_name: str,
    partition_names: list,
    output_fields: list,
    ids: list,
    timeout: float,
):
    result = client.get(
        collection_name=collection_name,
        partition_names=partition_names,
        output_fields=output_fields,
        ids=ids,
        timeout=timeout,
    )
    return result

@workflow(name="milvus_operations_with_watsonx")  
def milvus_operations_with_watsonx():
    client = setup_milvus_client(
        uri="http://127.0.0.1:19530", collection_name="demo_collection", dimension=768
    )
    partition_name = "partitionA"
    client.create_partition(
        collection_name="demo_collection", partition_name=partition_name
    )

    #  Watsonx Embedding model parameters
    ibm_cloud_url = os.getenv("WATSONX_URL")
    ibm_cloud_api_key = os.getenv("WATSONX_API_KEY")
    model_id = (
        "ibm/slate-125m-english-rtrvr"  # or any other supported model
    )
    project_id=os.getenv("WATSONX_PROJECT_ID")

    initialize_embedding_model(
        ibm_cloud_url=ibm_cloud_url,
        ibm_cloud_api_key=ibm_cloud_api_key,
        model_id=model_id,
        project_id=project_id
    )

    docs_history = [
        "Artificial intelligence was founded as an academic discipline in 1956.",
        "Alan Turing was the first person to conduct substantial research in AI.",
        "Born in Maida Vale, London, Turing was raised in southern England.",
    ]

    encode_documents_and_insert(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        docs=docs_history,
        subject="history",
        timeout=10,
    )  

    # Upsert example
    new_docs_history = [
        "Alan Turing developed the Turing Test.",
        "Artificial intelligence continues to evolve.",
    ]
    new_ids_history = [
        0,
        1,
    ]  
    upsert_entities(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        docs=new_docs_history,
        ids=new_ids_history,
        subject="history",
        timeout=10,
    )

    # Get example
    get_result = get_entities(
        client=client,
        collection_name="demo_collection",
        partition_names=[partition_name],
        output_fields=["text", "subject"],
        ids=new_ids_history,
        timeout=10,
    )
    print("Get Result:", get_result)

    # Semantic Search
    # Vector search
    result = perform_vector_search(
        client=client,
        collection_name="demo_collection",
        query="Who is Alan Turing?",
        limit=2,
        output_fields=["text", "subject"],
    )
    print(result)

    # Vector Search with Metadata Filtering
    docs_biology = [
        "Machine learning has been used for drug design.",
        "Computational synthesis with AI algorithms predicts molecular properties.",
        "DDR1 is involved in cancers and fibrosis.",
    ]

    encode_documents_and_insert(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        docs=docs_biology,
        subject="biology",
        timeout=10,
    )

    search_params = {"metric_type": "COSINE", "params": {}}

    searchResult = perform_vector_search_with_filter(
        client=client,
        collection_name="demo_collection",
        partition_names=[partition_name],
        anns_field="vector",
        search_params=search_params,
        query="tell me AI related information",
        filter="subject == 'biology'",
        limit=2,
        output_fields=["text", "subject"],
        timeout=10,
    )
    print(searchResult)

    # Perform Query
    queryResult = perform_query(
        client=client,
        collection_name="demo_collection",
        filter="subject == 'history'",
        output_fields=["text", "subject"],
    )
    print(queryResult)

    # Perform Query with ids as input param
    queryResult = perform_query_Ids_partition(
        client=client,
        collection_name="demo_collection",
        partition_names=[partition_name],
        limit=1,
        ids=[0, 2],
        output_fields=["text", "subject"],
        timeout=10,
    )
    print(queryResult)

    # Delete entities
    delete_entities(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        ids=[0, 2],
        timeout=10,
    )

    # 8. Delete entities by a filter expression
    delete_entities(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        filter="subject == 'biology'",
        timeout=10,
    )


milvus_operations_with_watsonx()
 

Um auf IBM watsonx zuzugreifen, benötigen Sie folgende Anmeldedaten:

export WATSONX_URL=<watsonx-url>
export WATSONX_API_KEY=<watsonx-iam-api-key>
export WATSONX_PROJECT_ID=<watsonx-project-id>
 

Führen Sie die Beispielanwendung aus, um die Installation und Konfiguration zu überprüfen.

python WatsonxEmbeddingMilvus.py
 

Um Traces und Metriken in „ Instana “ zu exportieren, verwenden Sie die folgende Exportkonfiguration:

OpenTelemetry -Traces, Metriken und Protokolldaten an den „ Instana “-Agenten senden

Agentenmodus

Konfigurieren Sie Ihre Anwendung so, dass sie Daten an den Endpunkt des „ Instana “-Agenten sendet:

export OTEL_RESOURCE_ATTRIBUTES="INSTANA_PLUGIN=genai"
export TRACELOOP_BASE_URL=<instana-agent-host>:4317
export TRACELOOP_LOGGING_ENABLED=true
export TRACELOOP_METRICS_ENABLED=true
export OTEL_EXPORTER_OTLP_INSECURE=true
 

Ausführliche Informationen zu den Ports finden Sie unter „Senden von Daten aus dem „ OpenTelemetry “ an den „ Instana “-Agenten “.

Agentenloser Modus

Konfigurieren Sie Ihre Anwendung so, dass Daten direkt an das Backend von „ Instana “ gesendet werden:

export OTEL_RESOURCE_ATTRIBUTES="INSTANA_PLUGIN=genai"

export TRACELOOP_BASE_URL=<instana-otlp-endpoint>:4317
export TRACELOOP_HEADERS="x-instana-key=<agent-key>,x-instana-host=<instana-host>"
export TRACELOOP_LOGGING_ENABLED=true
export TRACELOOP_METRICS_ENABLED=true
export OTEL_EXPORTER_OTLP_INSECURE=false
 

Weitere Informationen finden Sie unter „ OpenTelemetry -Daten an das Backend von Instana senden “.

Spuren anzeigen

Führen Sie die folgenden Schritte aus, um eine Anwendungsperspektive zu erstellen, mit der Sie Trace-Informationen anzeigen können, die während der Laufzeit der LLM-Anwendung erfasst wurden:

  1. Öffnen Sie in der Benutzeroberfläche von „ Instana “ den Assistenten „New Application Perspective“ auf eine der folgenden Arten:
    • Gehen Sie im Dashboard von „ Instana “ zum Abschnitt „Anwendungen “ und klicken Sie auf „Anwendung hinzufügen “.
    • Klicken Sie im Navigationsmenü auf „Anwendungen“ > „Hinzufügen“ und wählen Sie „Neue Anwendungsperspektive“ aus.
  2. Wählen Sie „Dienste“ oder „Endpunkte“ aus und klicken Sie auf „Weiter “.
  3. Klicken Sie auf „Filter hinzufügen“ und wählen Sie einen Dienstnamen aus. Mithilfe von OR Bedingungen können Sie mehrere Dienste und Endpunkte auswählen. Der Name des Dienstes wird durch den app_name Parameter in angegeben Traceloop.init(). Beispiel: Watsonx_Embeddings_MilvusClient.
  4. Geben Sie im Feld „Name der Anwendungsperspektive“ einen Namen für die LLM-Anwendungsperspektive ein. Klicken Sie anschließend auf „Erstellen “.

Die neue Anwendungsperspektive wurde erstellt.

Um Trace-Informationen anzuzeigen, wählen Sie im Navigationsmenü der Benutzeroberfläche von „ Instana “ die Option „Analytics“ aus. Im Analytics-Dashboard können Sie Aufrufe anhand von Anwendungen, Diensten und Endpunkten analysieren. Instana stellt die Daten anhand von Dienst-, Endpunkt- und Aufrufnamen dar. Sie können Traces oder Aufrufe anhand beliebiger Tags filtern und gruppieren, beispielsweise nach 'Trace->Service Name' equals Watsonx_Embeddings_MilvusClient. Weitere Informationen finden Sie unter „Traces und Aufrufe analysieren “.

Die aus dem vorstehenden Code erfassten Traces werden in der Benutzeroberfläche von „ Instana “ angezeigt.

Abbildung 1. Milvus Spuren finden
Milvus Spuren finden
Abbildung 2. Milvus Spuren einfügen
Milvus Spuren einfügen
Abbildung 3. Milvus Abfrageverläufe
Milvus Abfrageverläufe
Abbildung 4. Milvus Suchverläufe
Milvus Suchverläufe

Metriken anzeigen

Um die Kennzahlen anzuzeigen, die bei den Datenbankvorgängen von „ Milvus “ erfasst wurden, führen Sie die folgenden Schritte aus:

  1. Wählen Sie im Navigationsmenü der Benutzeroberfläche von „ Instana “ die Option „Infrastruktur“ aus.
  2. Klicken Sie auf „Infrastruktur analysieren “.
  3. Wählen Sie aus der Liste der Entitätstypen „ OTel “ und „ Milvus “ aus.
  4. Klicken Sie auf die Entitätsinstanz des DB -Entitätstyps „ OTel “ unter „ Milvus “. Das zugehörige Dashboard wird angezeigt.

Das Metrik-Dashboard bietet Einblicke in die Leistung und den Zustand Ihrer „ Milvus “-Datenbank, darunter:

  • Operationskennzahlen : Zeigen Sie die Anzahl der durchgeführten Operationen (Einfügungen, Upserts und Löschungen) im Zeitverlauf an.
  • Suchabstand : Abstand zwischen dem Suchanfragevektor und den übereinstimmenden Vektoren.
  • Latenzkennzahlen : Überwachen Sie die Antwortzeit bei Abfragevorgängen.
Abbildung 5. Milvus Metrik-Dashboard
Milvus Metrik-Dashboard