Milvus

Milvus 대규모의 동적 벡터 데이터를 효율적으로 저장하고 검색할 수 있도록 설계된 오픈 소스 벡터 데이터베이스입니다. 이 프로젝트는 벡터 유사도 검색을 위한 오픈소스 C++ 라이브러리인 Facebook Faiss를 기반으로 개발되었습니다. Milvus 를 사용하면 벡터 데이터를 효율적으로 생성, 관리 및 쿼리할 수 있는 환경을 제공하여 지능형 애플리케이션 개발을 용이하게 합니다.

Milvus Observability with Instana

OpenTelemetry 와 Instana 를 함께 사용하면 create, insert, upsert, delete와 같은 Milvus 데이터베이스 작업에 대한 트레이스를 수집할 수 있습니다.

Milvus 설정

시작하기 전에, 사용 중인 환경이 모든 필수 조건을 충족하는지 확인하십시오. 자세한 내용은 ‘필수 조건’을 참조하십시오.

Milvus 에 로컬로 연결하는 방법은 여러 가지가 있습니다. 다음은 Docker 를 사용하여 연결하는 방법 중 하나입니다.

프로젝트에서 Milvus 를 사용하려면 Milvus 를 설치하고 설정해야 합니다. Docker 를 사용하여 로컬 컴퓨터에 설치할 수도 있습니다.

  1. Docker 설치 : 시스템에 Docker 가 설치되어 있는지 확인하십시오. Docker 공식 웹사이트에서 다운로드할 수 있습니다.

  2. Milvus 설치 : Milvus 은 Milvus 저장소에서 Docker 컴포즈 구성 파일을 제공합니다. Docker 의 Compose를 사용하여 Milvus 를 설치하려면 다음 단계를 따르세요:

    • 다음 명령을 실행하여 ` docker ` 구성 파일을 다운로드하십시오:

      wget https://github.com/milvus-io/milvus/releases/download/v2.5.5/milvus-standalone-docker-compose.yml -O docker-compose.yml
       
    • 다음 명령을 실행하여 Milvus 를 시작하십시오:

      sudo docker compose up -d
       
    • 결과물은 다음과 같습니다.

      Creating milvus-etcd  ... done
      Creating milvus-minio ... done
      Creating milvus-standalone ... done
       

Milvus 를 시작하면,, milvus-minio 및 컨테이너가 milvus-standalone milvus-etcd실행됩니다.

다음 명령어를 사용하여 컨테이너가 정상적으로 실행 중인지 확인할 수 있습니다:

sudo docker-compose ps
 

결과물은 다음과 같습니다.


      Name                     Command                  State                            Ports
--------------------------------------------------------------------------------------------------------------------
milvus-etcd         etcd -advertise-client-url ...   Up             2379/tcp, 2380/tcp
milvus-minio        /usr/bin/docker-entrypoint ...   Up (healthy)   9000/tcp
milvus-standalone   /tini -- milvus run standalone   Up             0.0.0.0:19530->19530/tcp, 0.0.0.0:9091->9091/tcp

 

컨테이너를 시작한 후, 에 접속하여 http://localhost:19530Milvus 서버가 실행 중인지 확인하십시오.

PyMilvus, 를 설치하려면 다음 명령을 실행하십시오:

 pip install pymilvus
 

IBM 의 종속성을 설치하려면 watsonx, 다음 명령어를 실행하십시오:

pip install ibm-watsonx-ai==1.1.20 langchain-ibm==0.3.1
 

LLM 애플리케이션에서 다음 명령을 실행하여 Traceloop 트레이서를 초기화하십시오:

from traceloop.sdk import Traceloop
Traceloop.init()
 

다음 샘플 애플리케이션은 Milvus 에 연결하고, 데이터베이스에 데이터를 삽입하며, 삽입(insert), 삭제(delete), 업서트(upsert), 검색(search), 가져오기(get), 쿼리(query) 등을 포함하는 생성(create), 읽기(read), 업데이트(update), 삭제(delete) 작업(CRUD)을 수행하는 방법을 보여줍니다.

다음 코드를 사용하여 다음과 같은 WatsonxEmbeddingMilvus.py이름의 샘플 애플리케이션을 생성할 수 있습니다:

from pymilvus import MilvusClient

from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task

from langchain_ibm.embeddings import WatsonxEmbeddings 
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames
import os

Traceloop.init(app_name="Watsonx_Embeddings_MilvusClient")

# connect to Milvus Locally
@task(name="setup_milvus_client")
def setup_milvus_client(uri: str, collection_name: str, dimension: int):
    client = MilvusClient(uri=uri)
    ##create a collection in Milvus DB
    if client.has_collection(collection_name=collection_name):
        client.drop_collection(collection_name=collection_name)
    client.create_collection(
        collection_name=collection_name, dimension=dimension, timeout=10, metric_type="COSINE"
    )
    return client


embedding_model = None  # Define Embedding Model globally

#Initialize watsonx embedding model 
@task(name="initialize_embedding_model")
def initialize_embedding_model(
    ibm_cloud_url: str,
    ibm_cloud_api_key: str,
    model_id: str,
    project_id: str,
    model_kwargs: dict = None,
    encode_kwargs: dict = None,
):
    embed_params = {
    EmbedTextParamsMetaNames.TRUNCATE_INPUT_TOKENS: 3,
    EmbedTextParamsMetaNames.RETURN_OPTIONS: {"input_text": True},
    }
    global embedding_model
    model_kwargs = model_kwargs or {}
    encode_kwargs = encode_kwargs or {"normalize_embeddings": False}

    embedding_model = WatsonxEmbeddings(
        url=ibm_cloud_url,
        project_id=project_id,
        model_id=model_id,
        apikey=ibm_cloud_api_key,
        params=embed_params
    )


# embed Documents and insert it to Milvus DB
@task(name="encode_documents_and_insert")
def encode_documents_and_insert(
    client: MilvusClient,
    collection_name: str,
    partition_name: str,
    docs: list,
    subject: str,
    timeout: float,
):
    vectors = embedding_model.embed_documents(docs)
    data = [
        {"id": i, "vector": vectors[i], "text": docs[i], "subject": subject}
        for i in range(len(vectors))
    ]

    res = client.insert(
        collection_name=collection_name,
        partition_name=partition_name,
        data=data,
        timeout=timeout,
    )
    print(res)


# apply vector embedding on the query and search the same in the vecotr db
@task(name="perform_vector_search")
def perform_vector_search(
    client: MilvusClient,
    collection_name: str,
    query: str,
    limit: int,
    output_fields: list,
):
    query_vector = embedding_model.embed_query(query)
    result = client.search(
        collection_name=collection_name,
        partition_name="partitionA",
        data=[query_vector],
        limit=limit,
        output_fields=output_fields,
    )
    return result


# search in vecotr db with filters applied
@task(name="perform_vector_search_with_filter")
def perform_vector_search_with_filter(
    client: MilvusClient,
    collection_name: str,
    partition_names: list,
    anns_field: str,
    search_params: dict,
    query: str,
    filter: str,
    limit: int,
    output_fields: list,
    timeout: float,
):
    query_vector = embedding_model.embed_query(query)
    searchResult = client.search(
        collection_name=collection_name,
        partition_names=partition_names,
        search_params=search_params,
        anns_field=anns_field,
        data=[query_vector],
        filter=filter,
        limit=limit,
        output_fields=output_fields,
        timeout=timeout,
    )
    return searchResult


# query for entries in the Collection 
@task(name="perform_query")
def perform_query(
    client: MilvusClient,
    collection_name: str,
    filter: str,
    output_fields: list,
):
    queryResult = client.query(
        collection_name=collection_name,
        filter=filter,
        partition_names=["partitionA"],
        output_fields=output_fields,
    )
    return queryResult


# query the db passing list of ids
@task(name="perform_query_ids")
def perform_query_Ids_partition(
    client: MilvusClient,
    collection_name: str,
    partition_names: list,
    limit: int,
    ids: list,
    output_fields: list,
    timeout: float,
):
    queryResult = client.query(
        collection_name=collection_name,
        partition_names=partition_names,
        limit=limit,
        ids=ids,
        timeout=timeout,
    )
    return queryResult

# delete entries from the collection
@task(name="delete_entities")
def delete_entities(
    client: MilvusClient,
    collection_name: str,
    partition_name: str,
    ids: list = None,
    filter: str = None,
    timeout: float = None,
):
    if ids is not None:
        deleteResult = client.delete(collection_name=collection_name, ids=ids)
        print(deleteResult)
    if filter is not None:
        deleteRes = client.delete(
            collection_name=collection_name,
            timeout=timeout,
            filter=filter,
            partition_name=partition_name,
        )
        print(deleteRes)

# modify data in the collection
@task(name="upsert_entities")
def upsert_entities( 
    client: MilvusClient,
    collection_name: str,
    partition_name: str,
    docs: list,
    ids: list,
    subject: str,
    timeout: float,
):
    vectors = embedding_model.embed_documents(docs)
    data = [
        {"id": ids[i], "vector": vectors[i], "text": docs[i], "subject": subject}
        for i in range(len(vectors))
    ]

    res = client.upsert( 
        collection_name=collection_name,
        partition_name=partition_name,
        data=data,
        timeout=timeout,
    )
    print("Upsert Result:", res)


@task(name="get_entities")
def get_entities(
    client: MilvusClient,
    collection_name: str,
    partition_names: list,
    output_fields: list,
    ids: list,
    timeout: float,
):
    result = client.get(
        collection_name=collection_name,
        partition_names=partition_names,
        output_fields=output_fields,
        ids=ids,
        timeout=timeout,
    )
    return result

@workflow(name="milvus_operations_with_watsonx")  
def milvus_operations_with_watsonx():
    client = setup_milvus_client(
        uri="http://127.0.0.1:19530", collection_name="demo_collection", dimension=768
    )
    partition_name = "partitionA"
    client.create_partition(
        collection_name="demo_collection", partition_name=partition_name
    )

    #  Watsonx Embedding model parameters
    ibm_cloud_url = os.getenv("WATSONX_URL")
    ibm_cloud_api_key = os.getenv("WATSONX_API_KEY")
    model_id = (
        "ibm/slate-125m-english-rtrvr"  # or any other supported model
    )
    project_id=os.getenv("WATSONX_PROJECT_ID")

    initialize_embedding_model(
        ibm_cloud_url=ibm_cloud_url,
        ibm_cloud_api_key=ibm_cloud_api_key,
        model_id=model_id,
        project_id=project_id
    )

    docs_history = [
        "Artificial intelligence was founded as an academic discipline in 1956.",
        "Alan Turing was the first person to conduct substantial research in AI.",
        "Born in Maida Vale, London, Turing was raised in southern England.",
    ]

    encode_documents_and_insert(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        docs=docs_history,
        subject="history",
        timeout=10,
    )  

    # Upsert example
    new_docs_history = [
        "Alan Turing developed the Turing Test.",
        "Artificial intelligence continues to evolve.",
    ]
    new_ids_history = [
        0,
        1,
    ]  
    upsert_entities(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        docs=new_docs_history,
        ids=new_ids_history,
        subject="history",
        timeout=10,
    )

    # Get example
    get_result = get_entities(
        client=client,
        collection_name="demo_collection",
        partition_names=[partition_name],
        output_fields=["text", "subject"],
        ids=new_ids_history,
        timeout=10,
    )
    print("Get Result:", get_result)

    # Semantic Search
    # Vector search
    result = perform_vector_search(
        client=client,
        collection_name="demo_collection",
        query="Who is Alan Turing?",
        limit=2,
        output_fields=["text", "subject"],
    )
    print(result)

    # Vector Search with Metadata Filtering
    docs_biology = [
        "Machine learning has been used for drug design.",
        "Computational synthesis with AI algorithms predicts molecular properties.",
        "DDR1 is involved in cancers and fibrosis.",
    ]

    encode_documents_and_insert(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        docs=docs_biology,
        subject="biology",
        timeout=10,
    )

    search_params = {"metric_type": "COSINE", "params": {}}

    searchResult = perform_vector_search_with_filter(
        client=client,
        collection_name="demo_collection",
        partition_names=[partition_name],
        anns_field="vector",
        search_params=search_params,
        query="tell me AI related information",
        filter="subject == 'biology'",
        limit=2,
        output_fields=["text", "subject"],
        timeout=10,
    )
    print(searchResult)

    # Perform Query
    queryResult = perform_query(
        client=client,
        collection_name="demo_collection",
        filter="subject == 'history'",
        output_fields=["text", "subject"],
    )
    print(queryResult)

    # Perform Query with ids as input param
    queryResult = perform_query_Ids_partition(
        client=client,
        collection_name="demo_collection",
        partition_names=[partition_name],
        limit=1,
        ids=[0, 2],
        output_fields=["text", "subject"],
        timeout=10,
    )
    print(queryResult)

    # Delete entities
    delete_entities(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        ids=[0, 2],
        timeout=10,
    )

    # 8. Delete entities by a filter expression
    delete_entities(
        client=client,
        collection_name="demo_collection",
        partition_name=partition_name,
        filter="subject == 'biology'",
        timeout=10,
    )


milvus_operations_with_watsonx()
 

IBM ( watsonx )에 접속하려면 다음 인증 정보가 필요합니다:

export WATSONX_URL=<watsonx-url>
export WATSONX_API_KEY=<watsonx-iam-api-key>
export WATSONX_PROJECT_ID=<watsonx-project-id>
 

설치 및 구성이 제대로 되었는지 확인하려면 샘플 애플리케이션을 실행하십시오.

python WatsonxEmbeddingMilvus.py
 

Instana 로 트레이스 및 메트릭을 내보내려면 다음 내보내기 구성을 사용하십시오:

OpenTelemetry 의 추적 정보, 메트릭 및 로그 데이터를 Instana 에이전트로 전송

에이전트 모드

Instana 에이전트 엔드포인트로 데이터를 전송하도록 애플리케이션을 구성하십시오:

export OTEL_RESOURCE_ATTRIBUTES="INSTANA_PLUGIN=genai"
export TRACELOOP_BASE_URL=<instana-agent-host>:4317
export TRACELOOP_LOGGING_ENABLED=true
export TRACELOOP_METRICS_ENABLED=true
export OTEL_EXPORTER_OTLP_INSECURE=true
 

자세한 포트 정보는 ‘ OpenTelemetry 데이터를 Instana 에이전트로 전송하기’를 참조하십시오.

에이전트 없는 모드

애플리케이션을 설정하여 데이터를 Instana 백엔드로 직접 전송하도록 하세요:

export OTEL_RESOURCE_ATTRIBUTES="INSTANA_PLUGIN=genai"

export TRACELOOP_BASE_URL=<instana-otlp-endpoint>:4317
export TRACELOOP_HEADERS="x-instana-key=<agent-key>,x-instana-host=<instana-host>"
export TRACELOOP_LOGGING_ENABLED=true
export TRACELOOP_METRICS_ENABLED=true
export OTEL_EXPORTER_OTLP_INSECURE=false
 

자세한 내용은 ‘ OpenTelemetry 데이터를 Instana 백엔드로 전송하기’를 참조하십시오.

추적 기록 보기

LLM 애플리케이션 런타임에서 수집된 추적 정보를 확인하기 위한 애플리케이션 관점을 생성하려면 다음 단계를 수행하십시오:

  1. Instana UI에서 다음 방법 중 하나를 사용하여 ‘새 애플리케이션 관점’ 마법사를 엽니다
    • Instana 대시보드의 ‘애플리케이션’ 섹션에서 ‘애플리케이션 추가’를 클릭합니다.
    • 탐색 메뉴에서 ‘애플리케이션 (Applications)’ > ‘추가(Add )’를 클릭한 다음, ‘새 애플리케이션 관점(New Application Perspective)’을 선택합니다.
  2. ‘서비스’ 또는 ‘엔드포인트’를 선택한 다음 ‘다음’을 클릭합니다.
  3. ‘필터 추가’를 클릭하고 서비스 이름을 선택하세요. 조건을 OR 사용하여 여러 서비스와 엔드포인트를 선택할 수 있습니다. 서비스 이름은 의 app_name 매개변수로 Traceloop.init()지정됩니다. 예를 들어, Watsonx_Embeddings_MilvusClient입니다.
  4. ‘애플리케이션 관점 이름’ 필드에 LLM 애플리케이션 관점의 이름을 입력합니다. 그런 다음 작성을 클릭하십시오.

새로운 애플리케이션 관점이 생성되었습니다.

추적 정보를 확인하려면, Instana UI의 탐색 메뉴에서 ‘분석(Analytics) ’을 클릭하십시오. 애널리틱스 대시보드에서 애플리케이션, 서비스 및 엔드포인트를 사용하여 호출을 분석할 수 있습니다. Instana 서비스, 엔드포인트 및 호출 이름을 기준으로 데이터를 표시합니다. . 'Trace->Service Name' equals Watsonx_Embeddings_MilvusClient와 같은 임의의 태그를 사용하여 트레이스나 호출을 필터링하거나 그룹화할 수 있습니다. 자세한 내용은 ‘추적 및 호출 분석’을 참조하십시오.

앞서 작성한 코드에서 수집된 추적 정보는 Instana UI에 표시됩니다.

그림 1. Milvus 추적 기록 가져오기
Milvus 추적 기록 가져오기
그림 2. Milvus 트레이스 삽입
Milvus 트레이스 삽입
그림 3. Milvus 쿼리 추적
Milvus 쿼리 추적
그림 4. Milvus 검색 기록
Milvus 검색 기록

메트릭 보기

Milvus 데이터베이스 작업에서 수집된 메트릭을 확인하려면 다음 단계를 수행하십시오:

  1. Instana UI의 탐색 메뉴에서 ‘인프라’를 선택합니다.
  2. ‘인프라 분석’을 클릭합니다.
  3. 엔티티 유형 목록에서 ‘ OTel ’의 ‘ Milvus ’ DB를 선택하십시오.
  4. OTel ” DB 엔티티 유형의 엔티티 인스턴스를 클릭하십시오. Milvus 관련 대시보드가 표시됩니다.

이 메트릭 대시보드는 Milvus 데이터베이스의 성능 및 상태에 대한 정보를 제공하며, 다음을 포함합니다:

  • 작업 지표 : 시간 경과에 따른 수행된 작업 수(삽입, 업데이트 및 삭제)를 확인합니다.
  • 검색 거리 : 검색 쿼리 벡터와 일치하는 벡터들 사이의 거리.
  • 지연 시간 지표: 쿼리 작업의 응답 시간을 모니터링합니다.
그림 5. Milvus 지표 대시보드
Milvus 지표 대시보드