Milvus
Milvus est une base de données vectorielle open source conçue pour stocker et rechercher efficacement des données vectorielles dynamiques à grande échelle. Il a été développé à l'aide de Facebook Faiss, une bibliothèque open source d' C++ s destinée à la recherche de similitudes vectorielles. L'utilisation d' Milvus offre un environnement permettant de créer, de gérer et d'interroger efficacement des données vectorielles, facilitant ainsi le développement d'applications intelligentes.
Milvus Observability with Instana
En utilisant OpenTelemetry avec Instana, vous pouvez collecter des traces pour les opérations sur la base de données Milvus, telles que create, insert, upsert et delete.
Milvus configuration
Avant de commencer, assurez-vous que votre environnement répond à toutes les conditions préalables. Pour plus d'informations, consultez la section « Conditions préalables ».
Il existe plusieurs façons de se connecter localement à Milvus. La méthode suivante est l'une des méthodes permettant de se connecter à l'aide d' Docker
Pour commencer à utiliser Milvus dans votre projet, vous devez installer et configurer Milvus. Vous pouvez également l'installer sur votre ordinateur en utilisant Docker.
Installer l' Docker : assurez-vous d'avoir installé l' Docker sur votre système. Vous pouvez le télécharger depuis le site officiel d' Docker.
Installation d' Milvus : le site Milvus met à disposition un fichier de configuration « compose » pour Docker dans le référentiel Milvus. Pour installer Milvus à l'aide de la commande ` Docker `, procédez comme suit :
Téléchargez le fichier de configuration « docker » en exécutant la commande suivante :
wget https://github.com/milvus-io/milvus/releases/download/v2.5.5/milvus-standalone-docker-compose.yml -O docker-compose.ymlLancez « Milvus » en exécutant la commande suivante :
sudo docker compose up -dVous obtiendrez le résultat suivant :
Creating milvus-etcd ... done Creating milvus-minio ... done Creating milvus-standalone ... done
Une fois que vous aurez lancé Milvus, les conteneurs suivants seront opérationnels : milvus-standalone, milvus-minio, et milvus-etcd.
Vous pouvez vérifier si les conteneurs sont opérationnels à l'aide de la commande suivante :
sudo docker-compose ps
Vous obtiendrez le résultat suivant :
Name Command State Ports
--------------------------------------------------------------------------------------------------------------------
milvus-etcd etcd -advertise-client-url ... Up 2379/tcp, 2380/tcp
milvus-minio /usr/bin/docker-entrypoint ... Up (healthy) 9000/tcp
milvus-standalone /tini -- milvus run standalone Up 0.0.0.0:19530->19530/tcp, 0.0.0.0:9091->9091/tcp
Une fois le conteneur démarré, vérifiez que votre serveur Milvus est bien en cours d'exécution en vous rendant sur http://localhost:19530.
Pour installer PyMilvus,, exécutez la commande suivante :
pip install pymilvus
Pour installer les dépendances d' IBM watsonx, exécutez la commande suivante :
pip install ibm-watsonx-ai==1.1.20 langchain-ibm==0.3.1
Dans votre application LLM, initialisez le traceur Traceloop en exécutant la commande suivante :
from traceloop.sdk import Traceloop
Traceloop.init()
L'exemple d'application suivant montre comment se connecter à Milvus, insérer des données dans la base de données et effectuer des opérations de création, lecture, mise à jour et suppression (CRUD), notamment l'insertion, la suppression, l'upsert, la recherche, la récupération et l'interrogation.
Vous pouvez utiliser le code suivant pour générer une application d'exemple nommée WatsonxEmbeddingMilvus.py:
from pymilvus import MilvusClient
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task
from langchain_ibm.embeddings import WatsonxEmbeddings
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames
import os
Traceloop.init(app_name="Watsonx_Embeddings_MilvusClient")
# connect to Milvus Locally
@task(name="setup_milvus_client")
def setup_milvus_client(uri: str, collection_name: str, dimension: int):
client = MilvusClient(uri=uri)
##create a collection in Milvus DB
if client.has_collection(collection_name=collection_name):
client.drop_collection(collection_name=collection_name)
client.create_collection(
collection_name=collection_name, dimension=dimension, timeout=10, metric_type="COSINE"
)
return client
embedding_model = None # Define Embedding Model globally
#Initialize watsonx embedding model
@task(name="initialize_embedding_model")
def initialize_embedding_model(
ibm_cloud_url: str,
ibm_cloud_api_key: str,
model_id: str,
project_id: str,
model_kwargs: dict = None,
encode_kwargs: dict = None,
):
embed_params = {
EmbedTextParamsMetaNames.TRUNCATE_INPUT_TOKENS: 3,
EmbedTextParamsMetaNames.RETURN_OPTIONS: {"input_text": True},
}
global embedding_model
model_kwargs = model_kwargs or {}
encode_kwargs = encode_kwargs or {"normalize_embeddings": False}
embedding_model = WatsonxEmbeddings(
url=ibm_cloud_url,
project_id=project_id,
model_id=model_id,
apikey=ibm_cloud_api_key,
params=embed_params
)
# embed Documents and insert it to Milvus DB
@task(name="encode_documents_and_insert")
def encode_documents_and_insert(
client: MilvusClient,
collection_name: str,
partition_name: str,
docs: list,
subject: str,
timeout: float,
):
vectors = embedding_model.embed_documents(docs)
data = [
{"id": i, "vector": vectors[i], "text": docs[i], "subject": subject}
for i in range(len(vectors))
]
res = client.insert(
collection_name=collection_name,
partition_name=partition_name,
data=data,
timeout=timeout,
)
print(res)
# apply vector embedding on the query and search the same in the vecotr db
@task(name="perform_vector_search")
def perform_vector_search(
client: MilvusClient,
collection_name: str,
query: str,
limit: int,
output_fields: list,
):
query_vector = embedding_model.embed_query(query)
result = client.search(
collection_name=collection_name,
partition_name="partitionA",
data=[query_vector],
limit=limit,
output_fields=output_fields,
)
return result
# search in vecotr db with filters applied
@task(name="perform_vector_search_with_filter")
def perform_vector_search_with_filter(
client: MilvusClient,
collection_name: str,
partition_names: list,
anns_field: str,
search_params: dict,
query: str,
filter: str,
limit: int,
output_fields: list,
timeout: float,
):
query_vector = embedding_model.embed_query(query)
searchResult = client.search(
collection_name=collection_name,
partition_names=partition_names,
search_params=search_params,
anns_field=anns_field,
data=[query_vector],
filter=filter,
limit=limit,
output_fields=output_fields,
timeout=timeout,
)
return searchResult
# query for entries in the Collection
@task(name="perform_query")
def perform_query(
client: MilvusClient,
collection_name: str,
filter: str,
output_fields: list,
):
queryResult = client.query(
collection_name=collection_name,
filter=filter,
partition_names=["partitionA"],
output_fields=output_fields,
)
return queryResult
# query the db passing list of ids
@task(name="perform_query_ids")
def perform_query_Ids_partition(
client: MilvusClient,
collection_name: str,
partition_names: list,
limit: int,
ids: list,
output_fields: list,
timeout: float,
):
queryResult = client.query(
collection_name=collection_name,
partition_names=partition_names,
limit=limit,
ids=ids,
timeout=timeout,
)
return queryResult
# delete entries from the collection
@task(name="delete_entities")
def delete_entities(
client: MilvusClient,
collection_name: str,
partition_name: str,
ids: list = None,
filter: str = None,
timeout: float = None,
):
if ids is not None:
deleteResult = client.delete(collection_name=collection_name, ids=ids)
print(deleteResult)
if filter is not None:
deleteRes = client.delete(
collection_name=collection_name,
timeout=timeout,
filter=filter,
partition_name=partition_name,
)
print(deleteRes)
# modify data in the collection
@task(name="upsert_entities")
def upsert_entities(
client: MilvusClient,
collection_name: str,
partition_name: str,
docs: list,
ids: list,
subject: str,
timeout: float,
):
vectors = embedding_model.embed_documents(docs)
data = [
{"id": ids[i], "vector": vectors[i], "text": docs[i], "subject": subject}
for i in range(len(vectors))
]
res = client.upsert(
collection_name=collection_name,
partition_name=partition_name,
data=data,
timeout=timeout,
)
print("Upsert Result:", res)
@task(name="get_entities")
def get_entities(
client: MilvusClient,
collection_name: str,
partition_names: list,
output_fields: list,
ids: list,
timeout: float,
):
result = client.get(
collection_name=collection_name,
partition_names=partition_names,
output_fields=output_fields,
ids=ids,
timeout=timeout,
)
return result
@workflow(name="milvus_operations_with_watsonx")
def milvus_operations_with_watsonx():
client = setup_milvus_client(
uri="http://127.0.0.1:19530", collection_name="demo_collection", dimension=768
)
partition_name = "partitionA"
client.create_partition(
collection_name="demo_collection", partition_name=partition_name
)
# Watsonx Embedding model parameters
ibm_cloud_url = os.getenv("WATSONX_URL")
ibm_cloud_api_key = os.getenv("WATSONX_API_KEY")
model_id = (
"ibm/slate-125m-english-rtrvr" # or any other supported model
)
project_id=os.getenv("WATSONX_PROJECT_ID")
initialize_embedding_model(
ibm_cloud_url=ibm_cloud_url,
ibm_cloud_api_key=ibm_cloud_api_key,
model_id=model_id,
project_id=project_id
)
docs_history = [
"Artificial intelligence was founded as an academic discipline in 1956.",
"Alan Turing was the first person to conduct substantial research in AI.",
"Born in Maida Vale, London, Turing was raised in southern England.",
]
encode_documents_and_insert(
client=client,
collection_name="demo_collection",
partition_name=partition_name,
docs=docs_history,
subject="history",
timeout=10,
)
# Upsert example
new_docs_history = [
"Alan Turing developed the Turing Test.",
"Artificial intelligence continues to evolve.",
]
new_ids_history = [
0,
1,
]
upsert_entities(
client=client,
collection_name="demo_collection",
partition_name=partition_name,
docs=new_docs_history,
ids=new_ids_history,
subject="history",
timeout=10,
)
# Get example
get_result = get_entities(
client=client,
collection_name="demo_collection",
partition_names=[partition_name],
output_fields=["text", "subject"],
ids=new_ids_history,
timeout=10,
)
print("Get Result:", get_result)
# Semantic Search
# Vector search
result = perform_vector_search(
client=client,
collection_name="demo_collection",
query="Who is Alan Turing?",
limit=2,
output_fields=["text", "subject"],
)
print(result)
# Vector Search with Metadata Filtering
docs_biology = [
"Machine learning has been used for drug design.",
"Computational synthesis with AI algorithms predicts molecular properties.",
"DDR1 is involved in cancers and fibrosis.",
]
encode_documents_and_insert(
client=client,
collection_name="demo_collection",
partition_name=partition_name,
docs=docs_biology,
subject="biology",
timeout=10,
)
search_params = {"metric_type": "COSINE", "params": {}}
searchResult = perform_vector_search_with_filter(
client=client,
collection_name="demo_collection",
partition_names=[partition_name],
anns_field="vector",
search_params=search_params,
query="tell me AI related information",
filter="subject == 'biology'",
limit=2,
output_fields=["text", "subject"],
timeout=10,
)
print(searchResult)
# Perform Query
queryResult = perform_query(
client=client,
collection_name="demo_collection",
filter="subject == 'history'",
output_fields=["text", "subject"],
)
print(queryResult)
# Perform Query with ids as input param
queryResult = perform_query_Ids_partition(
client=client,
collection_name="demo_collection",
partition_names=[partition_name],
limit=1,
ids=[0, 2],
output_fields=["text", "subject"],
timeout=10,
)
print(queryResult)
# Delete entities
delete_entities(
client=client,
collection_name="demo_collection",
partition_name=partition_name,
ids=[0, 2],
timeout=10,
)
# 8. Delete entities by a filter expression
delete_entities(
client=client,
collection_name="demo_collection",
partition_name=partition_name,
filter="subject == 'biology'",
timeout=10,
)
milvus_operations_with_watsonx()
Pour accéder à IBM watsonx, vous devez disposer des identifiants suivants :
export WATSONX_URL=<watsonx-url>
export WATSONX_API_KEY=<watsonx-iam-api-key>
export WATSONX_PROJECT_ID=<watsonx-project-id>
Lancez l'application d'exemple pour vérifier l'installation et la configuration.
python WatsonxEmbeddingMilvus.py
Pour exporter des traces et des métriques vers Instana, utilisez la configuration d'exportation suivante :
Envoi des données de traces, de métriques et de journaux d' OpenTelemetry s à l'agent Instana
Mode de l'agent
Configurez votre application pour qu'elle envoie des données au point de terminaison de l'agent « Instana » :
export OTEL_RESOURCE_ATTRIBUTES="INSTANA_PLUGIN=genai"
export TRACELOOP_BASE_URL=<instana-agent-host>:4317
export TRACELOOP_LOGGING_ENABLED=true
export TRACELOOP_METRICS_ENABLED=true
export OTEL_EXPORTER_OTLP_INSECURE=true
Pour plus d'informations sur les ports, consultez la section « Envoi de données d' OpenTelemetry s à l'agent d' Instana ».
Mode sans agent
Configurez votre application pour qu'elle envoie les données directement au serveur d' Instana :
export OTEL_RESOURCE_ATTRIBUTES="INSTANA_PLUGIN=genai"
export TRACELOOP_BASE_URL=<instana-otlp-endpoint>:4317
export TRACELOOP_HEADERS="x-instana-key=<agent-key>,x-instana-host=<instana-host>"
export TRACELOOP_LOGGING_ENABLED=true
export TRACELOOP_METRICS_ENABLED=true
export OTEL_EXPORTER_OTLP_INSECURE=false
Pour plus d'informations, consultez la section « Envoi de données d' OpenTelemetry s vers le backend Instana ».
Affichage des traces
Pour créer une perspective d'application permettant de consulter les informations de trace recueillies lors de l'exécution de l'application LLM, procédez comme suit :
- Dans l'interface utilisateur d' Instana, ouvrez l'assistant « New Application Perspective » de l'une des manières suivantes :
- Dans le tableau de bord d' Instana, dans la section Applications, cliquez sur Ajouter une application.
- Dans le menu de navigation, cliquez sur Applications > Ajouter, puis sélectionnez Nouvelle perspective d'application.
- Sélectionnez « Services » ou « Points de terminaison », puis cliquez sur « Suivant ».
- Cliquez sur « Ajouter un filtre » et sélectionnez le nom d'un service. Vous pouvez sélectionner plusieurs services et points de terminaison à l'aide de
ORconditions. Le nom du service est spécifié par leapp_nameparamètre dansTraceloop.init(). Par exemple,Watsonx_Embeddings_MilvusClient. - Dans le champ « Nom de la perspective d'application », saisissez un nom pour la perspective d'application LLM. Cliquez ensuite sur Créer.
La nouvelle perspective d'application est créée.
Pour consulter les informations de suivi, accédez au menu de navigation de l'interface utilisateur d' Instana, puis cliquez sur « Analytics ». Dans le tableau de bord Analytics, vous pouvez utiliser les applications, les services et les points de terminaison pour analyser les appels. Instana présente les données en utilisant les noms des services, des points de terminaison et des appels. Vous pouvez filtrer et regrouper les traces ou les appels à l'aide de balises de votre choix, par exemple en les filtrant par 'Trace->Service Name' equals Watsonx_Embeddings_MilvusClient. Pour plus d'informations, consultez la section « Analyse des traces et des appels ».
Les traces collectées à partir du code précédent s'affichent dans l'interface utilisateur d' Instana.




Affichage des mesures
Pour consulter les indicateurs recueillis à partir des opérations de votre base de données Milvus, procédez comme suit :
- Dans le menu de navigation de l'interface utilisateur d' Instana, sélectionnez « Infrastructure ».
- Cliquez sur « Analyser l'infrastructure ».
- Dans la liste des types d'entités, sélectionnez « OTel Milvus DB ».
- Cliquez sur l'instance de l'entité « OTel » du type d'entité de base de données « Milvus ». Le tableau de bord correspondant s'affiche.
Le tableau de bord des indicateurs fournit des informations sur les performances et l'état de santé de votre base de données Milvus, notamment :
- Indicateurs d'opérations : consultez le nombre d'opérations effectuées (insertions, mises à jour avec insertion et suppressions) au fil du temps.
- Distance de recherche : distance entre le vecteur de requête et les vecteurs correspondants.
- Indicateurs de latence : surveillez le temps de réponse des opérations de requête.
