Milvus

Milvus is an open source vector db that is designed to efficiently store and search large-scale, dynamic vector data. It is developed on Facebook Faiss, an open source C++ library for vector similarity search. Using Milvus provides an environment where you can efficiently create, manage, and query vector data, facilitating the development of intelligent applications.

Milvus Observability with Instana

Using OpenTelemetry with Instana, you can collect traces for Milvus database operations, such as create, insert, upsert, and delete.

Milvus setup

Before you begin, make sure that your environment meets all the prerequisites. For more information, see Prerequisites.

You can connect to Milvus locally in many ways. The following method is one of the methods to connect by using Docker.

To start using Milvus in your project, you must install and set up Milvus. You can also install on your local machine by using Docker.

  1. Install Docker: Make sure that you installed Docker on your system. You can download it from the official Docker website.

  2. Install Milvus: Milvus provides a Docker compose configuration file in the Milvus repository. To install Milvus by using Docker compose, follow these steps:

    • Download the docker configuration file by running the following command:

      wget https://github.com/milvus-io/milvus/releases/download/v2.5.5/milvus-standalone-docker-compose.yml -O docker-compose.yml
      
    • Start Milvus by running the following command:

      sudo docker compose up -d
      
    • You will get the following output:

      Creating milvus-etcd  ... done
      Creating milvus-minio ... done
      Creating milvus-standalone ... done
      

After you start Milvus, the following containers will be up and running: milvus-standalone, milvus-minio, and milvus-etcd.

You can check whether the containers are up and running by using the following command:

sudo docker-compose ps

You will get the following output:


      Name                     Command                  State                            Ports
--------------------------------------------------------------------------------------------------------------------
milvus-etcd         etcd -advertise-client-url ...   Up             2379/tcp, 2380/tcp
milvus-minio        /usr/bin/docker-entrypoint ...   Up (healthy)   9000/tcp
milvus-standalone   /tini -- milvus run standalone   Up             0.0.0.0:19530->19530/tcp, 0.0.0.0:9091->9091/tcp

After you start the container, make sure that your Milvus server is running by accessing http://localhost:19530.

To install PyMilvus, run the following command:

 pip install pymilvus

To install dependencies for IBM watsonx, run the following command:

pip install ibm-watsonx-ai==1.1.20 langchain-ibm==0.3.1

In your LLM application, initialize the Traceloop tracer by running the following command:

from traceloop.sdk import Traceloop
Traceloop.init()

The following sample application shows how to connect to Milvus, insert data into the database, and perform create, read, update, and delete (CRUD) operations that include insert, delete, upsert, search, get, and query.

You can use the following code to generate a sample application named WatsonxEmbeddingMilvus.py:

from pymilvus import MilvusClient

from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task

from langchain_ibm.embeddings import WatsonxEmbeddings 
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames
import os

Traceloop.init(app_name="Watsonx_Embeddings_MilvusClient")

# connect to Milvus Locally
@task(name="setup_milvus_client")
def setup_milvus_client(uri: str, collection_name: str, dimension: int):
    client = MilvusClient(uri=uri)
    ##create a collection in Milvus DB
    if client.has_collection(collection_name=collection_name):
        client.drop_collection(collection_name=collection_name)
    client.create_collection(
        collection_name=collection_name, dimension=dimension, timeout=10, metric_type="COSINE"
    )
    return client


embedding_model = None  # Define Embedding Model globally

#Initialize watsonx embedding model 
@task(name="initialize_embedding_model")
def initialize_embedding_model(
    ibm_cloud_url: str,
    ibm_cloud_api_key: str,
    model_id: str,
    project_id: str,
    model_kwargs: dict = None,
    encode_kwargs: dict = None,
):
    embed_params = {
    EmbedTextParamsMetaNames.TRUNCATE_INPUT_TOKENS: 3,
    EmbedTextParamsMetaNames.RETURN_OPTIONS: {"input_text": True},
    }
    global embedding_model
    model_kwargs = model_kwargs or {}
    encode_kwargs = encode_kwargs or {"normalize_embeddings": False}

    embedding_model = WatsonxEmbeddings(
        url=ibm_cloud_url,
        project_id=project_id,
        model_id=model_id,
        apikey=ibm_cloud_api_key,
        params=embed_params
    )


# embed Documents and insert it to Milvus DB
@task(name="encode_documents_and_insert")
def encode_documents_and_insert(
    client: MilvusClient,
    collection_name: str,
    partition_name: str,
    docs: list,
    subject: str,
    timeout: float,
):
    vectors = embedding_model.embed_documents(docs)
    data = [
        {"id": i, "vector": vectors[i], "text": docs[i], "subject": subject}
        for i in range(len(vectors))
    ]

    res = client.insert(
        collection_name=collection_name,
        partition_name=partition_name,
        data=data,
        timeout=timeout,
    )
    print(res)


# apply vector embedding on the query and search the same in the vecotr db
@task(name="perform_vector_search")
def perform_vector_search(
    client: MilvusClient,
    collection_name: str,
    query: str,
    limit: int,
    output_fields: list,
):
    query_vector = embedding_model.embed_query(query)
    result = client.search(
        collection_name=collection_name,
        partition_name="partitionA",
        data=[query_vector],
        limit=limit,
        output_fields=output_fields,
    )
    return result


# search in vecotr db with filters applied
@task(name="perform_vector_search_with_filter")
def perform_vector_search_with_filter(
    client: MilvusClient,
    collection_name: str,
    partition_names: list,
    anns_field: str,
    search_params: dict,
    query: str,
    filter: str,
    limit: int,
    output_fields: list,
    timeout: float,
):
    query_vector = embedding_model.embed_query(query)
    searchResult = client.search(
        collection_name=collection_name,
        partition_names=partition_names,
        search_params=search_params,
        anns_field=anns_field,
        data=[query_vector],
        filter=filter,
        limit=limit,
        output_fields=output_fields,
        timeout=timeout,
    )
    return searchResult


# query for entries in the Collection 
@task(name="perform_query")
def perform_query(
    client: MilvusClient,
    collection_name: str,
    filter: str,
    output_fields: list,
):
    queryResult = client.query(
        collection_name=collection_name,
        filter=filter,
        partition_names=["partitionA"],
        output_fields=output_fields,
    )
    return queryResult


# query the db passing list of ids
@task(name="perform_query_ids")
def perform_query_Ids_partition(
    client: MilvusClient,
    collection_name: str,
    partition_names: list,
    limit: int,
    ids: list,
    output_fields: list,
    timeout: float,
):
    queryResult = client.query(
        collection_name=collection_name,
        partition_names=partition_names,
        limit=limit,
        ids=ids,
        timeout=timeout,
    )
    return queryResult

# delete entries from the collection
@task(name="delete_entities")
def delete_entities(
    client: MilvusClient,
    collection_name: str,
    partition_name: str,
    ids: list = None,
    filter: str = None,
    timeout: float = None,
):
    if ids is not None:
        deleteResult = client.delete(collection_name=collection_name, ids=ids)
        print(deleteResult)
    if filter is not None:
        deleteRes = client.delete(
            collection_name=collection_name,
            timeout=timeout,
            filter=filter,
            partition_name=partition_name,
        )
        print(deleteRes)

# modify data in the collection
@task(name="upsert_entities")
def upsert_entities( 
    client: MilvusClient,
    collection_name: str,
    partition_name: str,
    docs: list,
    ids: list,
    subject: str,
    timeout: float,
):
    vectors = embedding_model.embed_documents(docs)
    data = [
        {"id": ids[i], "vector": vectors[i], "text": docs[i], "subject": subject}
        for i in range(len(vectors))
    ]

    res = client.upsert( 
        collection_name=collection_name,
        partition_name=partition_name,
        data=data,
        timeout=timeout,
    )
    print("Upsert Result:", res)


@task(name="get_entities")
def get_entities(
    client: MilvusClient,
    collection_name: str,
    partition_names: list,
    output_fields: list,
    ids: list,
    timeout: float,
):
    result = client.get(
        collection_name=collection_name,
        partition_names=partition_names,
        output_fields=output_fields,
        ids=ids,
        timeout=timeout,
    )
    return result

@workflow(name="milvus_operations_with_watsonx")  
def milvus_operations_with_watsonx():
    client = setup_milvus_client(
        uri="http://127.0.0.1:19530", collection_name="demo_collection", dimension=768
    )
    partition_name = "partitionA"
    client.create_partition(
        collection_name="demo_collection", partition_name=partition_name
    )

    #  Watsonx Embedding model parameters
    ibm_cloud_url = os.getenv("WATSONX_URL")
    ibm_cloud_api_key = os.getenv("WATSONX_API_KEY")
    model_id = (
        "ibm/slate-125m-english-rtrvr"  # or any other supported model
    )
    project_id=os.getenv("WATSONX_PROJECT_ID")

    initialize_embedding_model(
        ibm_cloud_url=ibm_cloud_url,
        ibm_cloud_api_key=ibm_cloud_api_key,
        model_id=model_id,
        project_id=project_id
    )

    docs_history = [
        "Artificial intelligence was founded as an academic discipline in 1956.",
        "Alan Turing was the first person to conduct substantial research in AI.",
        "Born in Maida Vale, London, Turing was raised in southern England.",
    ]

    encode_documents_and_insert(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        docs=docs_history,
        subject="history",
        timeout=10,
    )  

    # Upsert example
    new_docs_history = [
        "Alan Turing developed the Turing Test.",
        "Artificial intelligence continues to evolve.",
    ]
    new_ids_history = [
        0,
        1,
    ]  
    upsert_entities(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        docs=new_docs_history,
        ids=new_ids_history,
        subject="history",
        timeout=10,
    )

    # Get example
    get_result = get_entities(
        client=client,
        collection_name="demo_collection",
        partition_names=[partition_name],
        output_fields=["text", "subject"],
        ids=new_ids_history,
        timeout=10,
    )
    print("Get Result:", get_result)

    # Semantic Search
    # Vector search
    result = perform_vector_search(
        client=client,
        collection_name="demo_collection",
        query="Who is Alan Turing?",
        limit=2,
        output_fields=["text", "subject"],
    )
    print(result)

    # Vector Search with Metadata Filtering
    docs_biology = [
        "Machine learning has been used for drug design.",
        "Computational synthesis with AI algorithms predicts molecular properties.",
        "DDR1 is involved in cancers and fibrosis.",
    ]

    encode_documents_and_insert(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        docs=docs_biology,
        subject="biology",
        timeout=10,
    )

    search_params = {"metric_type": "COSINE", "params": {}}

    searchResult = perform_vector_search_with_filter(
        client=client,
        collection_name="demo_collection",
        partition_names=[partition_name],
        anns_field="vector",
        search_params=search_params,
        query="tell me AI related information",
        filter="subject == 'biology'",
        limit=2,
        output_fields=["text", "subject"],
        timeout=10,
    )
    print(searchResult)

    # Perform Query
    queryResult = perform_query(
        client=client,
        collection_name="demo_collection",
        filter="subject == 'history'",
        output_fields=["text", "subject"],
    )
    print(queryResult)

    # Perform Query with ids as input param
    queryResult = perform_query_Ids_partition(
        client=client,
        collection_name="demo_collection",
        partition_names=[partition_name],
        limit=1,
        ids=[0, 2],
        output_fields=["text", "subject"],
        timeout=10,
    )
    print(queryResult)

    # Delete entities
    delete_entities(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        ids=[0, 2],
        timeout=10,
    )

    # 8. Delete entities by a filter expression
    delete_entities(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        filter="subject == 'biology'",
        timeout=10,
    )


milvus_operations_with_watsonx()

To access IBM watsonx, you need the following credentials:

export WATSONX_URL=<watsonx-url>
export WATSONX_API_KEY=<watsonx-iam-api-key>
export WATSONX_PROJECT_ID=<watsonx-project-id>

Run the sample application to verify the installation and configuration.

python WatsonxEmbeddingMilvus.py

To export traces and metrics to Instana, use the following export configuration:

Sending OpenTelemetry traces and logs data to the Instana agent

Agent mode

export TRACELOOP_BASE_URL=<instana-agent-host>:4317
export TRACELOOP_LOGGING_ENABLED=true
export TRACELOOP_LOGGING_ENDPOINT=$TRACELOOP_BASE_URL
export TRACELOOP_METRICS_ENABLED=true
export TRACELOOP_METRICS_ENDPOINT=<otel-dc-llm-host>:8000
export OTEL_EXPORTER_OTLP_METRICS_INSECURE=true
export OTEL_METRIC_EXPORT_INTERVAL=10000
export OTEL_EXPORTER_OTLP_INSECURE=true

for detailed port information, see Sending OpenTelemetry data to the Instana Agent.

Agentless mode

export TRACELOOP_BASE_URL=<instana-otlp-endpoint>:4317
export TRACELOOP_HEADERS="x-instana-key=<agent-key>,x-instana-host=<instana-host>"
export TRACELOOP_LOGGING_ENABLED=true
export TRACELOOP_LOGGING_ENDPOINT=$TRACELOOP_BASE_URL
export TRACELOOP_METRICS_ENABLED=true
export TRACELOOP_METRICS_ENDPOINT=<otel-dc-llm-host>:8000
export OTEL_EXPORTER_OTLP_METRICS_INSECURE=true
export OTEL_METRIC_EXPORT_INTERVAL=10000
export OTEL_EXPORTER_OTLP_INSECURE=true

For more information, see Sending OpenTelemetry data to the Instana backend.

Viewing traces

To create an application perspective to view trace information that is gathered from the LLM application runtime, complete the following steps:

  1. In the Instana UI, open the New Application Perspective wizard in one of the following ways:
    • On the Instana dashboard, in the Applications section, and click Add application.
    • From the navigation menu, click Applications > Add, and select New Application Perspective.
  2. Select Services or Endpoints and click Next.
  3. Click Add filter and select a service name. You can select multiple services and endpoints by using OR conditions. The service name is specified by the app_name parameter in Traceloop.init(). For example, Watsonx_Embeddings_MilvusClient.
  4. In the Application Perspective Name field, enter a name for the LLM application perspective. Then, click Create.

The new application perspective is created.

To view trace information, from the navigation menu in the Instana UI, and click Analytics. On the Analytics dashboard, you can use application, service, and endpoint to analyze calls. Instana presents the data by using service, endpoint, and call names. You can filter and group traces or calls by using arbitrary tags, such as filtered by 'Trace->Service Name' equals Watsonx_Embeddings_MilvusClient. For more information, see Analyzing traces and calls.

The traces that are collected from the preceding code are displayed in the Instana UI.

Milvus get traces
Figure 1. Milvus get traces

Milvus insert traces
Figure 2. Milvus insert traces

Milvus query traces
Figure 3. Milvus query traces

Milvus search traces
Figure 4. Milvus search traces