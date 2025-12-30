Link copied

Large language models (LLMs) have remarkable text generation and reasoning abilities but often produce factual inaccuracies or hallucinations due to their reliance on internal knowledge. Retrieval augmented generation (RAG) based solutions aim to resolve this issue by injecting external documents into the model’s context. However, traditional RAG approaches retrieve a fixed number of passages regardless of their necessity or quality, leading to redundancy, inefficiency and inconsistent factual grounding. The self-RAG framework provides a practical solution to this problem. It retrieves information on-demand by using special control tokens that dynamically decide when and how to perform retrieval during generation. Unlike agentic or multi-agent approaches that coordinate multiple models or components, self-RAG is a model-centric framework where a single model manages retrieval, generation and critique internally. Its self-critique process is a structured step where the model evaluates both its own output and the quality of the retrieved information, allowing it to adapt its retrieval behavior through self-reflection tokens. It combines retrieval, generation and self-critique of its own generations with a single model trained end-to-end that allows more efficient, factual and controllable text generation. This method was originally introduced in the paper on Self-RAG: Learning to Retrieve, Generate, and Critique Through Self-Reflection (2024), which explores how fine-tuning LLMs for self-evaluation can improve factual consistency in natural language processing (NLP) tasks.

How self-RAG works

The workflow of self-RAG is orchestrated by special reflection tokens that the model generates alongside its text output, making the entire inference process dynamic and controllable. When additional information is needed, a single LLM takes on both the retriever and critic roles. A retriever component fetches relevant external passages, and the same LLM then uses reflection tokens to evaluate and refine its own generation during inference. This architecture represents a broader trend in artificial intelligence (AI) toward models capable of introspection and dynamic reasoning, bridging advances in prompt engineering and long-form generations.

1. On-demand retrieval

The LLM first generates a retrieval token to determine whether external factual information is necessary for the query. The model skips the remaining retrieval-based steps and continues with standard generation if it concludes that retrieval is not necessary. If the retrieval token is decoded as “yes,” a retriever is called to fetch a set of relevant passages from an external knowledge base. This step makes sure that retrieval occurs when its expected utility is high.

2. Passage retrieval and generation

If retrieval is required, the retriever fetches relevant passages from an external knowledge base. The LLM simultaneously processes the input and retrieved passages and generates text continuation for each passage.

3. Generate and reflect on retrieved passages

For each segment generated, the model concurrently generates special critique tokens that are embedded directly within the output sequence. These tokens are not separate evaluations, rather they appear as part of the generated sequence and help the model check its own work as it goes: ISREL (Relevance): Assesses the usefulness of the retrieved passage. ISSUP (Support or factuality): Evaluates whether the generated text segment has whole, partial or no factual support from the source material. ISUSE (Utility): Evaluates the created segment’s overall quality, usefulness and structure.

4. Inference

During inference, reflection tokens are used to decide when to retrieve information or not. It enables the model to adjust to different tasks, such as retrieving less for creative activities and more for factual ones. When generating text, reflection tokens help the model in adhering to particular guidelines. They either provide clear boundaries or guide word choice, which makes the model’s responses more flexible and appropriate for various contexts.

5. Training the self-RAG

During training, reflection tokens are inserted into the training data based on evaluations made by the critic model. This approach keeps self-rag training efficient by allowing the model to learn how to judge its own outputs and decide when it needs to look up information. Hence, the model becomes better at producing accurate, controlled and high-quality responses. In the experiment conducted in the research mentioned previously, self-RAG outperforms many standard retrieval-augmented and instruction-tuned baselines across various tasks, including open-domain question answering, reasoning and fact verification. It improves factuality and citation accuracy by using self-reflection tokens and on-demand retrieval, matching or outperforming OpenAI’s models. In this tutorial, you’ll learn how to build a robust self-reflective RAG agent by using an IBM® Granite® model on watsonx® and LangGraph. Similar frameworks and tools, such as ChatGPT, llama2, LlamaIndex or LangChain, also enable complex RAG flows. However, this tutorial focuses on using the powerful multimodal models available through IBM. These models understand both text and images as well as its enterprise-grade design supports secure deployment, governance and scalability. These features make Granite well suited for building reliable, production-ready RAG systems that can handle complex data and maintain high standards of trust and performance.

Use case: Building a self-RAG query agent over multimodal documents

This tutorial demonstrates how to build a self-RAG agent designed to answer complex, multifaceted queries over internal knowledge bases that include both text and visual data. This agent will analyze PDF documents including technical guidelines and survey data. It will guide you to implement the self-RAG algorithm, which: Creates a multimodal knowledge base: Uses a language model (granite-3-3-8b-instruct) and vision LLM (granite-vision-3.3-2B) to extract text and images from PDFs, generate descriptive captions and create embeddings for both text and image data to enable semantic retrieval. Generates and reflects: It creates an answer segment, adds reflection tokens (such as ISREL, ISSUP and ISUSE) and evaluates its own output quality and factual accuracy. Executes self-correction: The LangGraph workflow extends the standard self-RAG approach by using a critique score derived from reflection tokens to guide its next steps. When the score is low, the agent requests stronger context and retrieves more relevant information before generating the next segment, helping produce a higher-quality final output. Provides segmented answers: Provides thorough and traceable responses by generating complex answers in a sequence of factually validated chunks.

Prerequisites

You need an IBM Cloud® account to create a watsonx.ai® project. Ensure that you have access to both your watsonx API Key and Project ID.

Steps

Step 1. Set up your environment

While you can choose from several tools, this tutorial walks you through how to set up an IBM account by using a Jupyter Notebook. Log in to watsonx.ai by using your IBM Cloud account. Create a watsonx.ai project. You can get your project ID from within your project. Click the Manage tab. Then, copy the project ID from the Details section of the General page. You need this ID for this tutorial. Create a Jupyter Notebook. This step opens a notebook environment where you can copy the code from this tutorial. Alternatively, you can download this notebook to your local system and upload it to your watsonx.ai project as an asset. To view more Granite tutorials, check out the IBM Granite Community. This tutorial is also available on GitHub. Note: You can run the multimodal self-RAG tutorial entirely on a local CPU system. This setup is achievable by adapting it to use local resources instead of remote cloud services. You can initialize the Granite instruct model (the 3.2 2B version) directly from Hugging Face by using the appropriate transformers steps. For data handling, save your PDF files directly on your local system and easily read them into your Jupyter Notebook environment by using their local file path, bypassing the need for IBM Cloud Object Storage. To handle the complex reasoning and self-critique, the larger remote Granite 3.3-8B model can be replaced by a powerful open source LLM hosted locally by using a dedicated server setup. This setup requires installing specific local Python dependencies, such as langgraph, faiss-cpu, sentence-transformers and pymupdf for the vector store, RAG logic, embeddings and PDF parsing. Models can be configured for efficient CPU operation by explicitly setting the device to “cpu” and adjusting the floating-point data type. This step manages memory usage and prevents crashes common with large models on typical desktop hardware.

Step 2. Set up watsonx.ai runtime service and API key

Create a watsonx.ai Runtime service instance (choose the Lite plan, which is a free instance). Generate an application programming interface (API) Key. Associate the watsonx.ai Runtime service to the project that you created in watsonx.ai.

Step 3. Installation of the packages

To build and orchestrate this multimodal self-reflective RAG agent, we require a comprehensive set of libraries. Install langgraph to define the core state machine that orchestrates the self-correction loop based on critique ratings. For integrating IBM Granite LLMs and embeddings from the watsonx platform, install langchain-ibm and ibm-watsonx-ai. For quick retrieval, install faiss-cpu that offers indexing for the vector store. We use deep learning libraries like torch and the Hugging Face transformers library to load and run the granite-vision-3.3-2B model. To extract and process the text and images from our PDF documents, pillow and pymupdf are essential. Lastly, to access raw data from Cloud Object Storage, ibm-cos-sdk is included.

# Install packages



!pip install -U “transformers>=4.50.0” “huggingface_hub>=0.26.2” \

torch torchvision torchaudio \

langgraph faiss-cpu Pillow requests tqdm pymupdf pydantic \

langchain-ibm ibm-watsonx-ai ibm-cos-sdk sentence-transformers



print(“Required packages installed.”)

Note: No GPU is required, but execution can be slower on CPU-based systems.

Step 4. Import required libraries

Next, import all the necessary modules to set up the fundamental tools for managing the multimodal components, processing documents, coordinating the RAG workflow and connecting to IBM watsonx.

# Core Libraries Import



import os

import getpass

import torch

import re

from pathlib import Path

from typing import List, Dict, Any, TypedDict

import gc



# LangGraph / LangChain Core

from langgraph.graph import StateGraph, END, START

from langchain_core.documents import Document

from langchain_ibm import WatsonxLLM, WatsonxEmbeddings

from ibm_watsonx_ai.metanames import GenTextParamsMetaNames



# Vector store + text utilities

from langchain_community.vectorstores import FAISS

from langchain_text_splitters import RecursiveCharacterTextSplitter



# General utils

from tqdm import tqdm

from PIL import Image

import fitz # PyMuPDF for PDFs

import numpy as np

import io # Ensure io is imported



print(“Core libraries imported successfully.”)

# Set up device for Vision Model

HF_DEVICE = “cuda” if torch.cuda.is_available() else “cpu”

print(f”PyTorch device: {HF_DEVICE}”)

Multimodal context: This tutorial uses a vision model and libraries like fitz to process both text and visual data into a unified context. This approach surpasses simple text-based RAG by enabling the agent to retrieve richer information and provide highly accurate answers derived from complex documents. Self-correction loop: The system uses LangGraph (StateGraph) to build a self-reflective RAG agent. This approach allows the LLM to critique its own output for relevance and accuracy, and then automatically initiate a correction cycle by querying the vector store or refining the prompt, minimizing hallucinations. Production-ready integration: The tutorial demonstrates a high-performance stack by integrating enterprise LLMs (such as Granite) accessed through an external application programming interface (API) or hugging face (depending on the setup). This approach also includes efficient vector storage (FAISS) and streamlined RAG logic, proving its viability for real-world deployment.

Step 5. Load watsonx credentials

This step prepares your environment to securely connect to the IBM watsonx platform, allowing you to use the hosted granite LLMs and embeddings.

# Load Watsonx Credentials



WML_URL = “https://us-south.ml.cloud.ibm.com”



# Securely input Watsonx credentials

WML_API_KEY = getpass.getpass(“Enter Watsonx API Key: “)

PROJECT_ID = input(“Enter Watsonx Project ID: “)



# Set environment variables for langchain-ibm

os.environ[“WATSONX_APIKEY”] = WML_API_KEY

os.environ[“WATSONX_PROJECT_ID”] = PROJECT_ID



print(“ Watsonx credentials loaded.”)

Step 6. Initialize models

This critical step configures the three distinct models required for our multimodal self-RAG agent.

# Initialize Models



from transformers import AutoProcessor, AutoModelForVision2Seq

from huggingface_hub import hf_hub_download



# LLM: Granite-3-3-8B-Instruct (Generator & Critic)

qa_llm = WatsonxLLM(

model_id=”ibm/granite-3-3-8b-instruct”,

url=WML_URL,

apikey=WML_API_KEY,

project_id=PROJECT_ID,

params={

GenTextParamsMetaNames.MAX_NEW_TOKENS: 512,

GenTextParamsMetaNames.TEMPERATURE: 0.1,

GenTextParamsMetaNames.TOP_P: 0.9,

GenTextParamsMetaNames.REPETITION_PENALTY: 1.05,

},

)

print(“Granite-3-3-8B-Instruct initialized for reasoning, QA, and self-critique.”)



# Embedding Model: Granite-embedding-278m-multilingual

embeddings_model = WatsonxEmbeddings(

model_id=”ibm/granite-embedding-278m-multilingual”,

url=WML_URL,

apikey=WML_API_KEY,

project_id=PROJECT_ID,

)

print(“Granite-embedding-278m-multilingual initialized for retrieval.”)



# Vision Model: Granite-Vision-3.3-2B

try:

print(“Loading Granite Vision model in Bfloat16 for memory efficiency...”)

vision_model_id = “ibm-granite/granite-vision-3.3-2b”



hf_processor = AutoProcessor.from_pretrained(vision_model_id)





hf_vision_model = AutoModelForVision2Seq.from_pretrained(

vision_model_id,

torch_dtype=torch.bfloat16 # <--- Saves ~50% VRAM on model weights

).to(HF_DEVICE)

hf_vision_model.eval()



print(“Granite-Vision-3.3-2B initialized successfully with Bfloat16.”)

except Exception as e:

print(f”Vision model load failed: {e}”)



print(f”Device available: {HF_DEVICE}”)

print(“All Watsonx + Vision models ready.”)

This configuration will: Initialize the granite-3-3-8B-instruct model to function as both the primary generator and the self-critic by producing the reflection tokens (ISREL, ISSUP, and ISUSE). For the self-critique loop, the parameters are optimized for factual, deterministic and stable answers. Initialize the granite-embedding-278m-multilingual model. This model generates the textual embeddings essential for efficient semantic search and retrieval in the FAISS vector store. Load the granite-vision-3.3-2B model locally by using the transformers library. This model creates text captions for images extracted from PDF documents.

Step 7. PDF data retrieval from Cloud Object Storage

This step focuses on securely retrieving the source dataset from IBM Cloud Object Storage into the memory of your execution environment. This process is necessary before any text splitting or multimodal analysis can begin. We have uploaded two PDF files to the database for this tutorial.

# PDF Text Extraction

import io

import os, types

import pandas as pd

from botocore.client import Config

import ibm_boto3



def __iter__(self): return 0



# @hidden_cell

# The following code accesses a file in your IBM Cloud Object Storage. It includes your credentials.

# You might want to remove those credentials before you share the notebook.



cos_client = ibm_boto3.client(service_name=’s3’,

ibm_api_key_id=’your_api_key_id’,

ibm_auth_endpoint=”https://iam.cloud.ibm.com/identity/token”,

config=Config(signature_version=’oauth’),

endpoint_url=’https://s3.direct.us-south.cloud-object-storage.appdomain.cloud’)



bucket = ‘bucket_key’

pdf_keys = [

‘ICH_E6(R3)_Guideline.pdf’,

‘inspection_survey.pdf’

]



def read_cos_pdf(bucket, key):

“””Read a PDF from IBM COS into bytes (streamed in chunks).”””

print(f” Downloading {key} ...”)

response = cos_client.get_object(Bucket=bucket, Key=key)

body = response[‘Body’]

data = io.BytesIO()

while True:

chunk = body.read(10 * 1024 * 1024) # 10 MB chunks

if not chunk:

break

data.write(chunk)

data.seek(0)

print(f” Finished downloading {key} ({data.getbuffer().nbytes / (1024*1024):.2f} MB)”)

return data.read()



# Loop through all PDFs and download

pdf_files = {}

for key in pdf_keys:

pdf_files[key] = read_cos_pdf(bucket, key)



print(f” All {len(pdf_files)} PDFs downloaded successfully.”)

Step 8. Multimodal PDF parsing and captioning

This step is crucial for transforming our raw PDF documents into a multimodal, searchable knowledge base for the self-RAG agent.

# Multi-Modal PDF Parsing and Captioning

import os

import pickle

import io

from PIL import Image

from langchain_core.documents import Document

import fitz # PyMuPDF



def extract_and_caption_pdf(filename: str, pdf_content: bytes) -> List[Document]:

“””Extracts text and images from in-memory PDF content, captions images, and returns LangChain Documents.”””

print(f”

Processing {filename}...”, flush=True)



# Open the PDF from the in-memory byte stream

doc = fitz.open(stream=pdf_content, filetype=”pdf”)

all_content = []



# 1. Extract Text Chunks (Unchanged)

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)

for i, page in enumerate(doc):

text = page.get_text()

chunks = text_splitter.split_text(text)

for j, chunk in enumerate(chunks):

doc_metadata = {“source”: filename, “page”: i + 1, “chunk_id”: f”P{i+1}-T{j}”}

all_content.append(Document(page_content=chunk, metadata=doc_metadata))



# 2. Extract and Caption Images

if ‘inspection_survey’ in filename.lower():

print(f” -> {filename} identified as image-containing. Beginning image extraction...”, flush=True)



for i, page in enumerate(doc):

image_list = page.get_images(full=True)

for j, img_info in enumerate(image_list):

try:

xref = img_info[0]

base_image = doc.extract_image(xref)

image_bytes = base_image[“image”]



# Defensive Image Loading and Normalization

img_stream = io.BytesIO(image_bytes)

image = Image.open(img_stream)



# Convert to RGB to fix ‘Unable to infer channel dimension’ errors

if image.mode != ‘RGB’:

image = image.convert(‘RGB’)



# Memory Optimization (Resizing)

MAX_DIM = 1024

if max(image.size) > MAX_DIM:

image.thumbnail((MAX_DIM, MAX_DIM), Image.Resampling.LANCZOS)



# --- Captioning ---

print(f” -> Captioning image {j+1} on page {i+1}...”, flush=True)



conversation = [

{

“role”: “user”,

“content”: [

{“type”: “image”, “image”: image},

{“type”: “text”, “text”: “Describe this image, chart, or diagram in detail. Summarize its key findings or data points.”},

],

},

]



# Apply chat template and generate

inputs = hf_processor.apply_chat_template(

conversation,

add_generation_prompt=True,

tokenize=True,

return_dict=True,

return_tensors=”pt”

).to(HF_DEVICE)



# Use Bfloat16 for input tensors to match the model’s dtype

if hf_vision_model.dtype == torch.bfloat16:

inputs = {k: v.to(torch.bfloat16) if v.is_floating_point() else v for k, v in inputs.items()}



output = hf_vision_model.generate(**inputs, max_new_tokens=256)

caption = hf_processor.decode(output[0], skip_special_tokens=True).strip()



# Create document from caption

caption_doc = f”IMAGE CAPTION (Source: {filename}, Page {i+1}, Image {j+1}): {caption}”

img_metadata = {“source”: filename, “page”: i + 1, “chunk_id”: f”P{i+1}-I{j}”, “type”: “image_caption”}

all_content.append(Document(page_content=caption_doc, metadata=img_metadata))



# Aggressive Memory Clearing

del inputs

del output

torch.cuda.empty_cache()

gc.collect()



except Exception as e:

print(f” Error processing image on page {i+1}, image {j+1}: {e}”, flush=True)

# Clear memory even on error

torch.cuda.empty_cache()

gc.collect()

continue



return all_content



# Execution of the Multi-modal Parsing (Caching Logic Added)



CACHE_FILE = ‘multimodal_documents_cache.pkl’

all_documents = []



if os.path.exists(CACHE_FILE):

# Load from Cache

print(f”

Cache file found: {CACHE_FILE}. Loading documents from cache...”, flush=True)

try:

with open(CACHE_FILE, ‘rb’) as f:

all_documents = pickle.load(f)

print(“Documents successfully loaded from cache. Skipping multi-modal parsing.”, flush=True)

except Exception as e:

# Fallback if the cache file is corrupted

print(f”Error loading cache file: {e}. Attempting to run full parsing.”, flush=True)

os.remove(CACHE_FILE) # Delete bad cache



else:

# Run Expensive Parsing and Save to Cache

print(f”

Cache file not found. Running Multi-Modal PDF Parsing and Captioning...”, flush=True)



# Assuming ‘pdf_files’ dictionary is populated from your COS retrieval step

for filename, content in pdf_files.items():

all_documents.extend(extract_and_caption_pdf(filename, content))



print(f”

Finished parsing. Total documents created: {len(all_documents)}”, flush=True)



# Save the results

try:

with open(CACHE_FILE, ‘wb’) as f:

pickle.dump(all_documents, f)

print(f”Successfully saved all {len(all_documents)} documents to {CACHE_FILE}.”, flush=True)

except Exception as e:

print(f”WARNING: Could not save cache file {CACHE_FILE}: {e}”, flush=True)





print(f”

Total documents (text chunks + image captions) available: {len(all_documents)}”, flush=True)

This parsing will: • Define the function and use fitz to accurately pull both text and embedded image bytes from structured documents, a task simple text readers often fail at. • Pass the extracted images and a descriptive prompt to the locally loaded Granite vision model as it is crucial for multimodality. By converting images into descriptive text captions, we make visual information searchable through the standard text embedding model. This mechanism ensures that the agent is not “blind” to nontextual context, thus improves the completeness of the knowledge base. • Implement caching logic to store the results, preventing the time-consuming and computationally demanding multimodal captioning process from having to be repeated. Storing the processed knowledge base speeds up development and repeated execution. • Ensure that the final knowledge base gives the self-reflective agent full context that includes both textual and visual data. This objective is the main one of the entire process, giving the later self-reflective retrieval the foundation it needs to be precise and well-founded.

Step 9. Indexing and retriever setup

This step completes the preparation of the multimodal knowledge base by indexing all processed document chunks into an efficient, searchable vector store that forms the basis for the agent’s initial retrieval capability.

# Indexing and Retriever Setup

from langchain_ibm import WatsonxEmbeddings

from langchain_community.vectorstores import FAISS



print(“

Starting Vector Store Creation “, flush=True)



try:

# Create the FAISS Vector Store

vectorstore = FAISS.from_documents(

documents=all_documents,

embedding=embeddings_model

)

print(f”Vector Store created successfully with {len(all_documents)} documents.”, flush=True)



# Create the Retriever

# We set ‘k=5’ to retrieve the top 5 most similar documents for any given query.

retriever = vectorstore.as_retriever(search_kwargs={“k”: 5})

print(“Retriever configured (k=5). Ready for RAG.”, flush=True)



except Exception as e:

# This captures errors like embedding failures.

print(f”Vector Store creation failed: {e}”, flush=True)

This configuration plays a key role in preparing the retrieval layer for the self-RAG workflow: • It builds a high efficiency vector store by using FAISS that is well known for its speed and scalability when handling dense vector indexes. This step ensures that similarity searches run quickly, which is critical for maintaining a responsive RAG pipeline. • It transforms the multimodal knowledge base into vector representations, allowing the retriever to match user queries by meaning rather than relying on exact keyword overlap. • It fine tunes context delivery by typically retrieving the top five most relevant documents (k=5), balancing precision and relevance within the model’s context window. • It establishes a single, consistent knowledge source that the self-RAG agent can depend on for factual grounding that is an essential element of any trustworthy retrieval augmented system.

Step 10. LangGraph state and core self-RAG logic

This step sets up the main sections of the self-RAG workflow. The agent state tracks the entire process. The LangGraph node functions manage the flexible, self-correcting logic.

# LangGraph state and core self-RAG logic



from typing import TypedDict, List

from langchain_core.documents import Document

from langgraph.graph import StateGraph, END

import re

# Assumed objects: qa_llm, retriever, calculate_score (defined below)





# Define the Agent State (Schema)

class AgentState(TypedDict):

“””Represents the state of the Self-RAG agent.”””

query: str # The original user query

retrieved_docs: List[Document] # Documents retrieved from the vector store

generation_history: List[str] # History of generated segments

critique_score: float # The critique score of the last generated segment

segment_count: int # Counter for generated segments

finish_generation: bool # Flag to stop the generation loop





# LangGraph Node Functions (SELF-RAG Logic)

MAX_SEGMENTS = 10

SCORE_THRESHOLD = 2.5 # Defined here for convenience, but also used in evaluate_critique



def calculate_score(isrel_val: str, issup_val: str, isuse_val: int) -> float:

“””Calculates the combined weighted score for a segment (Soft Constraint).”””

W_ISSUP = 3.0

W_ISREL = 1.5

W_ISUSE = 0.5



score_rel = 1.0 if isrel_val == “Relevant” else 0.0



if “Fully Supported” in issup_val:

score_sup = 1.0

elif “Partially” in issup_val:

score_sup = 0.5

else:

score_sup = 0.0



score_use = (isuse_val - 1) / 4.0



total_score = (W_ISREL * score_rel) + (W_ISSUP * score_sup) + (W_ISUSE * score_use)

return total_score





def initial_decision(state: AgentState) -> AgentState:

“””Initial decision on whether to retrieve based on the query type.”””

query = state[“query”]



prompt = f”””

You are an expert self-reflecting LLM. Your task is to determine if external knowledge is required to answer the following query accurately.

- If knowledge is required, output the token: <|Retrieve=Yes|>

- If the query is open-ended or based on common knowledge, output the token: <|Retrieve=No|>



Query: “{query}”



Decision Token:

“””



response = qa_llm.invoke(prompt)



if “<|Retrieve=Yes|>“ in response:

print(“Decision: Retrieval required.”, flush=True)

return {“query”: query, “retrieved_docs”: [], “critique_score”: 0.0, “segment_count”: 0, “finish_generation”: False}

else:

print(“Decision: No retrieval required for initial generation.”, flush=True)

return {“query”: query, “retrieved_docs”: [Document(page_content=”No documents retrieved.”)], “critique_score”: 0.0, “segment_count”: 0, “finish_generation”: False}





def retrieve_docs(state: AgentState) -> AgentState:

“””Retrieves documents based on the current query or the last generated segment.”””

query = state[“query”]



if state.get(“generation_history”):

search_query = state[“generation_history”][-1]

else:

search_query = query



print(f”Retrieving documents for: ‘{search_query[:50]}...’”, flush=True)

docs = retriever.invoke(search_query)



return {“retrieved_docs”: docs}





def generate_segment(state: AgentState) -> AgentState:

“””Generates the next answer segment and self-reflects using critique tokens.”””

query = state[“query”]

history = state.get(“generation_history”, [])



docs_context = “

---

”.join([f”Source ({d.metadata.get(‘chunk_id’)}): {d.page_content}” for d in state[“retrieved_docs”]])

history_context = “

”.join(history)



prompt = f”””

You are a SELF-RAG agent using the IBM Granite model. Your goal is to generate one accurate, concise segment of an answer.



INSTRUCTION: Generate a comprehensive, multi-segment answer to the user’s query.

1. CONTEXT: Use the provided document segments (which include text and image captions) to answer the question accurately.

2. SEGMENTATION: Only use the <|END|> token when the answer is fully comprehensive and detailed, and you have no more relevant information to add.

3. REFLECTION: After generating the segment, immediately append these key-value reflection tokens:

- ISREL: <|ISREL=Relevant|> or <|ISREL=Irrelevant|>

- ISSUP: <|ISSUP=Fully Supported|> or <|ISSUP=Partially Supported|> or <|ISSUP=No Support|>

- ISUSE: <|ISUSE=N|> (where N is the overall quality/utility score from 1 to 5, 5 is best).



CURRENT QUERY: “{query}”



HISTORY SO FAR: “{history_context}”



RETRIEVED CONTEXT (Multi-Modal: text chunks and image captions):

{docs_context}



---



Generate the NEXT SEGMENT and REFLECTION TOKENS. End the entire generation with <|END|> if the answer is complete.

“””



print(f”Generating Segment {state[‘segment_count’] + 1}...”, flush=True)

full_response = qa_llm.invoke(prompt)



CRITIQUE_TOKENS = [“<|ISREL=”, “<|ISSUP=”, “<|ISUSE=”, “|>“]



isrel = re.search(r”<\|ISREL=(.+?)\|>“, full_response)

issup = re.search(r”<\|ISSUP=(.+?)\|>“, full_response)

isuse = re.search(r”<\|ISUSE=(\d+)\|>“, full_response)



isrel_val = isrel.group(1).strip() if isrel else “Irrelevant”

issup_val = issup.group(1).strip() if issup else “No Support”

isuse_val = int(isuse.group(1).strip()) if isuse and isuse.group(1).isdigit() else 1



segment = full_response

for token in CRITIQUE_TOKENS + [“<|Retrieve=Yes|>“, “<|Retrieve=No|>“, “<|END|>“]:

segment = segment.replace(token, “”).strip()



new_history = history + [segment]



print(f” -> ISREL: {isrel_val}, ISSUP: {issup_val}, ISUSE: {isuse_val}”, flush=True)



return {

“generation_history”: new_history,

“segment_count”: state[“segment_count”] + 1,

“finish_generation”: “<|END|>“ in full_response,

“critique_score”: calculate_score(isrel_val, issup_val, isuse_val),

“retrieved_docs”: state[“retrieved_docs”]

}





def evaluate_critique(state: AgentState) -> str:

“””Conditional edge function to determine the next step based on critique score.”””

score = state[“critique_score”]

segment_count = state[“segment_count”]

is_finished = state[“finish_generation”]



if is_finished or segment_count >= MAX_SEGMENTS:

return “end”



if score < SCORE_THRESHOLD:

print(f”Critique: Low score ({score:.2f}) observed. FORCING RE-RETRIEVAL for next segment.”, flush=True)

return “retrieve”



print(f”Critique: High score ({score:.2f}) observed. Continuing generation.”, flush=True)

return “continue”





def finalize_answer(state: AgentState) -> AgentState:

“””Compiles the final answer.”””

final_answer = “

”.join(state[“generation_history”])

print(“

--- FINAL ANSWER ---“, flush=True)

print(final_answer, flush=True)

return state





# Build and Compile the LangGraph Workflow



print(“

Building and Compiling LangGraph Workflow “, flush=True)



workflow = StateGraph(AgentState)



# Add Nodes (Function Calls)

workflow.add_node(“initial_decision”, initial_decision)

workflow.add_node(“retrieve_docs”, retrieve_docs)

workflow.add_node(“generate_segment”, generate_segment)

workflow.add_node(“finalize_answer”, finalize_answer)





# Define Edges (Flow Control)

workflow.set_entry_point(“initial_decision”)



# Edge 1: Decide between retrieval or initial generation

workflow.add_conditional_edges(

“initial_decision”,

lambda state: “retrieve” if not state[“retrieved_docs”] else “generate”,

{

“retrieve”: “retrieve_docs”,

“generate”: “generate_segment”,

}

)



# Edge 2: After retrieval, always generate a segment

workflow.add_edge(“retrieve_docs”, “generate_segment”)





# Edge 3: The core loop - Evaluate the critique score to determine the next action

workflow.add_conditional_edges(

“generate_segment”,

evaluate_critique,

{

“retrieve”: “retrieve_docs”,

“continue”: “generate_segment”,

“end”: “finalize_answer”,

}

)



# Edge 4: End the workflow

workflow.add_edge(“finalize_answer”, END)





# Compile the Graph

app = workflow.compile()

print(“LangGraph workflow compiled successfully (object named ‘app’).”, flush=True)

This code serves several purposes: • The agent keeps a core memory that stores its evolving response, the evidence it has retrieved and internal feedback. This memory helps the agent’s logic to dynamically improve its reasoning by storing context across various steps. • The agent first determines whether adequate factual grounding is present before producing any segments. To ensure that the generated response is accurate and pertinent, the agent intelligently seeks for stronger, more supportive information if the existing context is deemed incomplete. • Alongside each generated segment, the model issues internal reflection tokens that immediately quantify the output’s relevance, factual support and overall quality. These critical signals are then combined into a single critique score, giving the agent an objective, measurable way to judge its own performance. • Determined by the critique score, the agent then decides whether to rework, expand upon or finalize its answer. This iterative process makes the system inherently resilient, forcing it to improve incorrect generations and maintain factual precision over multiple reasoning rounds.

Step 11. LangGraph state and core self-RAG logic

The entire self-RAG workflow begins with this last step.

Sample query 1

# Execute the LangGraph Workflow



# 1. Define the Query

# This query is designed to require information from both documents.

user_query = “What is the primary purpose of the ICH E6(R3) Guideline and what are the key findings from the EFPIA 2024 inspection survey regarding remote inspections?”



# 2. Define the Initial Input State

# The generation_history must start empty.

inputs = {

“query”: user_query,

“generation_history”: []

}



print(f”

--- STARTING LANGGRAPH EXECUTION ---“, flush=True)

print(f”Query: {user_query}

”, flush=True)



# 3. Stream the Execution

# This loop runs the graph and prints the state update after each node completes.

for step in app.stream(inputs):

# Print the name of the node that just executed and its resulting state

print(step, flush=True)

print(“

--- NODE TRANSITION ---“, flush=True)



print(f”--- LANGGRAPH EXECUTION COMPLETE ---“, flush=True)

# Final Answer Extraction and Review





final_state = None

# The query and inputs are reused from Step 10

user_query = “What is the primary purpose of the ICH E6(R3) Guideline and what are the key findings from the EFPIA 2024 inspection survey regarding remote inspections?”

inputs = {“query”: user_query, “generation_history”: []}





print(“

--- RE-RUNNING EXECUTION FOR FINAL EXTRACTION ---“, flush=True)



for step in app.stream(inputs):

for key, value in step.items():

# The key tells us which node just ran (e.g., ‘finalize_answer’)

# The value is the state output of that node

if key == “finalize_answer”:

final_state = value

elif key == END:

# If the END node is hit, the graph is finished

final_state = value



# 2. Extract and Format the Final Answer

if final_state and “generation_history” in final_state:

# Join all generated segments into one cohesive answer

final_answer = “

”.join(final_state[“generation_history”]).strip()



print(“

==============================================”, flush=True)

print(“ RAG PIPELINE COMPLETE”, flush=True)

print(“==============================================”, flush=True)

print(f”USER QUERY:

{user_query}

”, flush=True)

print(f”FINAL GENERATED ANSWER ({final_state[‘segment_count’]} segments):”, flush=True)

print(“----------------------------------------------“, flush=True)

print(final_answer, flush=True)

print(“----------------------------------------------“, flush=True)

else:

print(“

EXECUTION FAILED or final state was not captured.”, flush=True)

print(f”Last recorded state: {final_state}”, flush=True)

Output

The ICH E6(R3) Guideline primarily focuses on good clinical practice for design and conduct of clinical trials on medicinal products. It aims to harmonize these practices across different regions to ensure the protection of human subjects involved in clinical trials and the quality and integrity of the data generated. Regarding remote inspections, the EFPIA 2024 inspection survey reveals that while there is a trend of fewer remote inspections in the EU/EEA post-pandemic, the US shows no clear trend, with a slight decrease. The survey also highlights the potential for minimizing increased efforts through strategies like utilizing local inspectorates as leads, leveraging different time zones for document reviews, and producing one inspection report with agreed observations. However, uncertainty about return on investment and business priorities were cited as reasons for not applying in the 2024 pilot.

Sample query 2

# USER QUERY: According to EFPIA 2024 data on multiple inspections at manufacturing sites, which countries recorded the highest inspection counts per site, and what does this reveal about their regulatory significance?

# 1. Define the query and inputs for the graph execution

user_query = “According to EFPIA 2024 data on multiple inspections at manufacturing sites, which countries recorded the highest inspection counts per site, and what does this reveal about their regulatory significance?”

inputs = {“query”: user_query, “generation_history”: []}



# 2. Rerun stream and capture the final state

print(“--- Rerunning stream to answer the new combined query ---“, flush=True)



final_state = None

# This loop runs the graph and prints the state update after each node completes.

for step in app.stream(inputs):

print(step, flush=True)

print(“

--- NODE TRANSITION ---

”, flush=True)

# Capture the last yielded state (which contains the compiled final history)

for key, value in step.items():

if key != END:

final_state = value



# 3. Extract and Format the Final Answer

if final_state and “generation_history” in final_state:

# Join all generated segments (which should now be clean)

final_answer_text = “

”.join(final_state[“generation_history”]).strip()



# Run a final cleanup pass

final_answer_text = re.sub(r’\s*(Relevant|Irrelevant)\s*(Fully Supported|Partially Supported|No Support)\s*\d’, ‘’, final_answer_text).strip()

final_answer_text = final_answer_text.replace(“<|END”, “”).strip()



# 4. Present the Results

print(“



#####################################################”, flush=True)

print(“ FINAL SELF-RAG ANSWER “, flush=True)

print(“#####################################################

”, flush=True)



print(“--- ANSWER ---“, flush=True)

print(final_answer_text, flush=True)

print(“

#####################################################”, flush=True)



else:

print(“

EXECUTION FAILED or final state was not captured.”, flush=True)

Output

The countries with the highest inspection counts per manufacturing site, according to the EFPIA 2024 data, are Germany and Denmark, each with four multiple inspections at their sites. This indicates significant regulatory scrutiny and importance in the pharmaceutical manufacturing sector. Germany stands out with additional sites also facing inspections from Belarus, Türkiye, Russia, and the US-FDA, while Denmark has inspections from Japan, Brazil, US-FDA, Türkiye, Kenya, Chinese Taipei, and the Rep. of Korea. The high number of inspections suggests that these countries play crucial roles in global pharmaceutical manufacturing oversight, likely due to their central positions in the industry and stringent regulatory environments.

Once the agent either reaches the maximum number of segments or completes its multisegment answer, it produces the final output to the user question. The .stream() method is then used to run the compiled graph, represented by the app object. The initial state, which contains the detailed user_query, is passed in through the inputs dictionary. As the graph streams, each loop processes one node at a time based on the system’s internal logic. Every node’s output is printed as it runs, letting us watch the agent refine its reasoning in real time and build its multipart response ultimately ending with a well supported final answer. The final step reruns the full self-RAG workflow to create a refined answer. It executes the LangGraph and watches the streaming state updates until the finalize_answer or END node shows up. It pulls the generated segments and joins them into a grounded final answer whenever the final state is reached.

The self-reflective retrieval augmented generation setup in this tutorial offers major advantages over standard RAG, mainly in terms of reliability and smart efficiency. Its biggest strength is improved factual accuracy and traceability, made possible by the Granite LLM running its own self-critiques with reflection tokens. These critiques produce a score that guides the workflow, allowing adaptive retrieval and the model pulls new context only when a segment isn’t well supported. This approach also makes it easier to work with complex, multimodal documents because image captions can be added to the vector store. The result is a more trustworthy, flexible query agent that checks and segments its answers against the knowledge base before giving the final result.