Milvus
Milvus 대규모의 동적 벡터 데이터를 효율적으로 저장하고 검색할 수 있도록 설계된 오픈 소스 벡터 데이터베이스입니다. 이 프로젝트는 벡터 유사도 검색을 위한 오픈소스 C++ 라이브러리인 Facebook Faiss를 기반으로 개발되었습니다. Milvus 를 사용하면 벡터 데이터를 효율적으로 생성, 관리 및 쿼리할 수 있는 환경을 제공하여 지능형 애플리케이션 개발을 용이하게 합니다.
Milvus Observability with Instana
OpenTelemetry 와 Instana 를 함께 사용하면 create, insert, upsert, delete와 같은 Milvus 데이터베이스 작업에 대한 트레이스를 수집할 수 있습니다.
Milvus 설정
시작하기 전에, 사용 중인 환경이 모든 필수 조건을 충족하는지 확인하십시오. 자세한 내용은 ‘필수 조건’을 참조하십시오.
Milvus 에 로컬로 연결하는 방법은 여러 가지가 있습니다. 다음은 Docker 를 사용하여 연결하는 방법 중 하나입니다.
프로젝트에서 Milvus 를 사용하려면 Milvus 를 설치하고 설정해야 합니다. Docker 를 사용하여 로컬 컴퓨터에 설치할 수도 있습니다.
Docker 설치 : 시스템에 Docker 가 설치되어 있는지 확인하십시오. Docker 공식 웹사이트에서 다운로드할 수 있습니다.
Milvus 설치 : Milvus 은 Milvus 저장소에서 Docker 컴포즈 구성 파일을 제공합니다. Docker 의 Compose를 사용하여 Milvus 를 설치하려면 다음 단계를 따르세요:
다음 명령을 실행하여 ` docker ` 구성 파일을 다운로드하십시오:
wget https://github.com/milvus-io/milvus/releases/download/v2.5.5/milvus-standalone-docker-compose.yml -O docker-compose.yml다음 명령을 실행하여 Milvus 를 시작하십시오:
sudo docker compose up -d결과물은 다음과 같습니다.
Creating milvus-etcd ... done Creating milvus-minio ... done Creating milvus-standalone ... done
Milvus 를 시작하면,, milvus-minio 및 컨테이너가 milvus-standalone milvus-etcd실행됩니다.
다음 명령어를 사용하여 컨테이너가 정상적으로 실행 중인지 확인할 수 있습니다:
sudo docker-compose ps
결과물은 다음과 같습니다.
Name Command State Ports
--------------------------------------------------------------------------------------------------------------------
milvus-etcd etcd -advertise-client-url ... Up 2379/tcp, 2380/tcp
milvus-minio /usr/bin/docker-entrypoint ... Up (healthy) 9000/tcp
milvus-standalone /tini -- milvus run standalone Up 0.0.0.0:19530->19530/tcp, 0.0.0.0:9091->9091/tcp
컨테이너를 시작한 후, 에 접속하여 http://localhost:19530Milvus 서버가 실행 중인지 확인하십시오.
PyMilvus, 를 설치하려면 다음 명령을 실행하십시오:
pip install pymilvus
IBM 의 종속성을 설치하려면 watsonx, 다음 명령어를 실행하십시오:
pip install ibm-watsonx-ai==1.1.20 langchain-ibm==0.3.1
LLM 애플리케이션에서 다음 명령을 실행하여 Traceloop 트레이서를 초기화하십시오:
from traceloop.sdk import Traceloop
Traceloop.init()
다음 샘플 애플리케이션은 Milvus 에 연결하고, 데이터베이스에 데이터를 삽입하며, 삽입(insert), 삭제(delete), 업서트(upsert), 검색(search), 가져오기(get), 쿼리(query) 등을 포함하는 생성(create), 읽기(read), 업데이트(update), 삭제(delete) 작업(CRUD)을 수행하는 방법을 보여줍니다.
다음 코드를 사용하여 다음과 같은 WatsonxEmbeddingMilvus.py이름의 샘플 애플리케이션을 생성할 수 있습니다:
from pymilvus import MilvusClient
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task
from langchain_ibm.embeddings import WatsonxEmbeddings
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames
import os
Traceloop.init(app_name="Watsonx_Embeddings_MilvusClient")
# connect to Milvus Locally
@task(name="setup_milvus_client")
def setup_milvus_client(uri: str, collection_name: str, dimension: int):
client = MilvusClient(uri=uri)
##create a collection in Milvus DB
if client.has_collection(collection_name=collection_name):
client.drop_collection(collection_name=collection_name)
client.create_collection(
collection_name=collection_name, dimension=dimension, timeout=10, metric_type="COSINE"
)
return client
embedding_model = None # Define Embedding Model globally
#Initialize watsonx embedding model
@task(name="initialize_embedding_model")
def initialize_embedding_model(
ibm_cloud_url: str,
ibm_cloud_api_key: str,
model_id: str,
project_id: str,
model_kwargs: dict = None,
encode_kwargs: dict = None,
):
embed_params = {
EmbedTextParamsMetaNames.TRUNCATE_INPUT_TOKENS: 3,
EmbedTextParamsMetaNames.RETURN_OPTIONS: {"input_text": True},
}
global embedding_model
model_kwargs = model_kwargs or {}
encode_kwargs = encode_kwargs or {"normalize_embeddings": False}
embedding_model = WatsonxEmbeddings(
url=ibm_cloud_url,
project_id=project_id,
model_id=model_id,
apikey=ibm_cloud_api_key,
params=embed_params
)
# embed Documents and insert it to Milvus DB
@task(name="encode_documents_and_insert")
def encode_documents_and_insert(
client: MilvusClient,
collection_name: str,
partition_name: str,
docs: list,
subject: str,
timeout: float,
):
vectors = embedding_model.embed_documents(docs)
data = [
{"id": i, "vector": vectors[i], "text": docs[i], "subject": subject}
for i in range(len(vectors))
]
res = client.insert(
collection_name=collection_name,
partition_name=partition_name,
data=data,
timeout=timeout,
)
print(res)
# apply vector embedding on the query and search the same in the vecotr db
@task(name="perform_vector_search")
def perform_vector_search(
client: MilvusClient,
collection_name: str,
query: str,
limit: int,
output_fields: list,
):
query_vector = embedding_model.embed_query(query)
result = client.search(
collection_name=collection_name,
partition_name="partitionA",
data=[query_vector],
limit=limit,
output_fields=output_fields,
)
return result
# search in vecotr db with filters applied
@task(name="perform_vector_search_with_filter")
def perform_vector_search_with_filter(
client: MilvusClient,
collection_name: str,
partition_names: list,
anns_field: str,
search_params: dict,
query: str,
filter: str,
limit: int,
output_fields: list,
timeout: float,
):
query_vector = embedding_model.embed_query(query)
searchResult = client.search(
collection_name=collection_name,
partition_names=partition_names,
search_params=search_params,
anns_field=anns_field,
data=[query_vector],
filter=filter,
limit=limit,
output_fields=output_fields,
timeout=timeout,
)
return searchResult
# query for entries in the Collection
@task(name="perform_query")
def perform_query(
client: MilvusClient,
collection_name: str,
filter: str,
output_fields: list,
):
queryResult = client.query(
collection_name=collection_name,
filter=filter,
partition_names=["partitionA"],
output_fields=output_fields,
)
return queryResult
# query the db passing list of ids
@task(name="perform_query_ids")
def perform_query_Ids_partition(
client: MilvusClient,
collection_name: str,
partition_names: list,
limit: int,
ids: list,
output_fields: list,
timeout: float,
):
queryResult = client.query(
collection_name=collection_name,
partition_names=partition_names,
limit=limit,
ids=ids,
timeout=timeout,
)
return queryResult
# delete entries from the collection
@task(name="delete_entities")
def delete_entities(
client: MilvusClient,
collection_name: str,
partition_name: str,
ids: list = None,
filter: str = None,
timeout: float = None,
):
if ids is not None:
deleteResult = client.delete(collection_name=collection_name, ids=ids)
print(deleteResult)
if filter is not None:
deleteRes = client.delete(
collection_name=collection_name,
timeout=timeout,
filter=filter,
partition_name=partition_name,
)
print(deleteRes)
# modify data in the collection
@task(name="upsert_entities")
def upsert_entities(
client: MilvusClient,
collection_name: str,
partition_name: str,
docs: list,
ids: list,
subject: str,
timeout: float,
):
vectors = embedding_model.embed_documents(docs)
data = [
{"id": ids[i], "vector": vectors[i], "text": docs[i], "subject": subject}
for i in range(len(vectors))
]
res = client.upsert(
collection_name=collection_name,
partition_name=partition_name,
data=data,
timeout=timeout,
)
print("Upsert Result:", res)
@task(name="get_entities")
def get_entities(
client: MilvusClient,
collection_name: str,
partition_names: list,
output_fields: list,
ids: list,
timeout: float,
):
result = client.get(
collection_name=collection_name,
partition_names=partition_names,
output_fields=output_fields,
ids=ids,
timeout=timeout,
)
return result
@workflow(name="milvus_operations_with_watsonx")
def milvus_operations_with_watsonx():
client = setup_milvus_client(
uri="http://127.0.0.1:19530", collection_name="demo_collection", dimension=768
)
partition_name = "partitionA"
client.create_partition(
collection_name="demo_collection", partition_name=partition_name
)
# Watsonx Embedding model parameters
ibm_cloud_url = os.getenv("WATSONX_URL")
ibm_cloud_api_key = os.getenv("WATSONX_API_KEY")
model_id = (
"ibm/slate-125m-english-rtrvr" # or any other supported model
)
project_id=os.getenv("WATSONX_PROJECT_ID")
initialize_embedding_model(
ibm_cloud_url=ibm_cloud_url,
ibm_cloud_api_key=ibm_cloud_api_key,
model_id=model_id,
project_id=project_id
)
docs_history = [
"Artificial intelligence was founded as an academic discipline in 1956.",
"Alan Turing was the first person to conduct substantial research in AI.",
"Born in Maida Vale, London, Turing was raised in southern England.",
]
encode_documents_and_insert(
client=client,
collection_name="demo_collection",
partition_name=partition_name,
docs=docs_history,
subject="history",
timeout=10,
)
# Upsert example
new_docs_history = [
"Alan Turing developed the Turing Test.",
"Artificial intelligence continues to evolve.",
]
new_ids_history = [
0,
1,
]
upsert_entities(
client=client,
collection_name="demo_collection",
partition_name=partition_name,
docs=new_docs_history,
ids=new_ids_history,
subject="history",
timeout=10,
)
# Get example
get_result = get_entities(
client=client,
collection_name="demo_collection",
partition_names=[partition_name],
output_fields=["text", "subject"],
ids=new_ids_history,
timeout=10,
)
print("Get Result:", get_result)
# Semantic Search
# Vector search
result = perform_vector_search(
client=client,
collection_name="demo_collection",
query="Who is Alan Turing?",
limit=2,
output_fields=["text", "subject"],
)
print(result)
# Vector Search with Metadata Filtering
docs_biology = [
"Machine learning has been used for drug design.",
"Computational synthesis with AI algorithms predicts molecular properties.",
"DDR1 is involved in cancers and fibrosis.",
]
encode_documents_and_insert(
client=client,
collection_name="demo_collection",
partition_name=partition_name,
docs=docs_biology,
subject="biology",
timeout=10,
)
search_params = {"metric_type": "COSINE", "params": {}}
searchResult = perform_vector_search_with_filter(
client=client,
collection_name="demo_collection",
partition_names=[partition_name],
anns_field="vector",
search_params=search_params,
query="tell me AI related information",
filter="subject == 'biology'",
limit=2,
output_fields=["text", "subject"],
timeout=10,
)
print(searchResult)
# Perform Query
queryResult = perform_query(
client=client,
collection_name="demo_collection",
filter="subject == 'history'",
output_fields=["text", "subject"],
)
print(queryResult)
# Perform Query with ids as input param
queryResult = perform_query_Ids_partition(
client=client,
collection_name="demo_collection",
partition_names=[partition_name],
limit=1,
ids=[0, 2],
output_fields=["text", "subject"],
timeout=10,
)
print(queryResult)
# Delete entities
delete_entities(
client=client,
collection_name="demo_collection",
partition_name=partition_name,
ids=[0, 2],
timeout=10,
)
# 8. Delete entities by a filter expression
delete_entities(
client=client,
collection_name="demo_collection",
partition_name=partition_name,
filter="subject == 'biology'",
timeout=10,
)
milvus_operations_with_watsonx()
IBM ( watsonx )에 접속하려면 다음 인증 정보가 필요합니다:
export WATSONX_URL=<watsonx-url>
export WATSONX_API_KEY=<watsonx-iam-api-key>
export WATSONX_PROJECT_ID=<watsonx-project-id>
설치 및 구성이 제대로 되었는지 확인하려면 샘플 애플리케이션을 실행하십시오.
python WatsonxEmbeddingMilvus.py
Instana 로 트레이스 및 메트릭을 내보내려면 다음 내보내기 구성을 사용하십시오:
OpenTelemetry 의 추적 정보, 메트릭 및 로그 데이터를 Instana 에이전트로 전송
에이전트 모드
Instana 에이전트 엔드포인트로 데이터를 전송하도록 애플리케이션을 구성하십시오:
export OTEL_RESOURCE_ATTRIBUTES="INSTANA_PLUGIN=genai"
export TRACELOOP_BASE_URL=<instana-agent-host>:4317
export TRACELOOP_LOGGING_ENABLED=true
export TRACELOOP_METRICS_ENABLED=true
export OTEL_EXPORTER_OTLP_INSECURE=true
자세한 포트 정보는 ‘ OpenTelemetry 데이터를 Instana 에이전트로 전송하기’를 참조하십시오.
에이전트 없는 모드
애플리케이션을 설정하여 데이터를 Instana 백엔드로 직접 전송하도록 하세요:
export OTEL_RESOURCE_ATTRIBUTES="INSTANA_PLUGIN=genai"
export TRACELOOP_BASE_URL=<instana-otlp-endpoint>:4317
export TRACELOOP_HEADERS="x-instana-key=<agent-key>,x-instana-host=<instana-host>"
export TRACELOOP_LOGGING_ENABLED=true
export TRACELOOP_METRICS_ENABLED=true
export OTEL_EXPORTER_OTLP_INSECURE=false
자세한 내용은 ‘ OpenTelemetry 데이터를 Instana 백엔드로 전송하기’를 참조하십시오.
추적 기록 보기
LLM 애플리케이션 런타임에서 수집된 추적 정보를 확인하기 위한 애플리케이션 관점을 생성하려면 다음 단계를 수행하십시오:
- Instana UI에서 다음 방법 중 하나를 사용하여 ‘새 애플리케이션 관점’ 마법사를 엽니다
- Instana 대시보드의 ‘애플리케이션’ 섹션에서 ‘애플리케이션 추가’를 클릭합니다.
- 탐색 메뉴에서 ‘애플리케이션 (Applications)’ > ‘추가(Add )’를 클릭한 다음, ‘새 애플리케이션 관점(New Application Perspective)’을 선택합니다.
- ‘서비스’ 또는 ‘엔드포인트’를 선택한 다음 ‘다음’을 클릭합니다.
- ‘필터 추가’를 클릭하고 서비스 이름을 선택하세요. 조건을
OR사용하여 여러 서비스와 엔드포인트를 선택할 수 있습니다. 서비스 이름은 의app_name매개변수로Traceloop.init()지정됩니다. 예를 들어,Watsonx_Embeddings_MilvusClient입니다. - ‘애플리케이션 관점 이름’ 필드에 LLM 애플리케이션 관점의 이름을 입력합니다. 그런 다음 작성을 클릭하십시오.
새로운 애플리케이션 관점이 생성되었습니다.
추적 정보를 확인하려면, Instana UI의 탐색 메뉴에서 ‘분석(Analytics) ’을 클릭하십시오. 애널리틱스 대시보드에서 애플리케이션, 서비스 및 엔드포인트를 사용하여 호출을 분석할 수 있습니다. Instana 서비스, 엔드포인트 및 호출 이름을 기준으로 데이터를 표시합니다. . 'Trace->Service Name' equals Watsonx_Embeddings_MilvusClient와 같은 임의의 태그를 사용하여 트레이스나 호출을 필터링하거나 그룹화할 수 있습니다. 자세한 내용은 ‘추적 및 호출 분석’을 참조하십시오.
앞서 작성한 코드에서 수집된 추적 정보는 Instana UI에 표시됩니다.




메트릭 보기
Milvus 데이터베이스 작업에서 수집된 메트릭을 확인하려면 다음 단계를 수행하십시오:
- Instana UI의 탐색 메뉴에서 ‘인프라’를 선택합니다.
- ‘인프라 분석’을 클릭합니다.
- 엔티티 유형 목록에서 ‘ OTel ’의 ‘ Milvus ’ DB를 선택하십시오.
- “ OTel ” DB 엔티티 유형의 엔티티 인스턴스를 클릭하십시오. Milvus 관련 대시보드가 표시됩니다.
이 메트릭 대시보드는 Milvus 데이터베이스의 성능 및 상태에 대한 정보를 제공하며, 다음을 포함합니다:
- 작업 지표 : 시간 경과에 따른 수행된 작업 수(삽입, 업데이트 및 삭제)를 확인합니다.
- 검색 거리 : 검색 쿼리 벡터와 일치하는 벡터들 사이의 거리.
- 지연 시간 지표: 쿼리 작업의 응답 시간을 모니터링합니다.
