Create a Retrieval Augmented Generation (RAG) pipeline
type: "io.kestra.plugin.ai.rag.ChatCompletion"
Examples
Chat with your data using Retrieval Augmented Generation (RAG). This flow will index documents and use the RAG Chat task to interact with your data using natural language prompts. The flow contrasts prompts to LLM with and without RAG. The Chat with RAG retrieves embeddings stored in the KV Store and provides a response grounded in data rather than hallucinating. WARNING: the Kestra KV embedding store is for quick prototyping only, as it stores the embedding vectors in Kestra's KV store and loads them all into memory.
id: rag
namespace: company.ai
tasks:
- id: ingest
type: io.kestra.plugin.ai.rag.IngestDocument
provider:
type: io.kestra.plugin.ai.provider.GoogleGemini
modelName: gemini-embedding-exp-03-07
apiKey: "{{ kv('GEMINI_API_KEY') }}"
embeddings:
type: io.kestra.plugin.ai.embeddings.KestraKVStore
drop: true
fromExternalURLs:
- https://raw.githubusercontent.com/kestra-io/docs/refs/heads/main/content/blogs/release-0-24.md
- id: parallel
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: chat_without_rag
type: io.kestra.plugin.ai.completion.ChatCompletion
provider:
type: io.kestra.plugin.ai.provider.GoogleGemini
messages:
- type: USER
content: Which features were released in Kestra 0.24?
- id: chat_with_rag
type: io.kestra.plugin.ai.rag.ChatCompletion
chatProvider:
type: io.kestra.plugin.ai.provider.GoogleGemini
embeddingProvider:
type: io.kestra.plugin.ai.provider.GoogleGemini
modelName: gemini-embedding-exp-03-07
embeddings:
type: io.kestra.plugin.ai.embeddings.KestraKVStore
systemMessage: You are a helpful assistant that can answer questions about Kestra.
prompt: Which features were released in Kestra 0.24?
pluginDefaults:
- type: io.kestra.plugin.ai.provider.GoogleGemini
values:
apiKey: "{{ kv('GEMINI_API_KEY') }}"
modelName: gemini-2.5-flash
RAG chat with a web search content retriever (answers grounded in search results)
id: rag_with_websearch_content_retriever
namespace: company.ai
tasks:
- id: chat_with_rag_and_websearch_content_retriever
type: io.kestra.plugin.ai.rag.ChatCompletion
chatProvider:
type: io.kestra.plugin.ai.provider.GoogleGemini
modelName: gemini-2.5-flash
apiKey: "{{ kv('GEMINI_API_KEY') }}"
contentRetrievers:
- type: io.kestra.plugin.ai.retriever.TavilyWebSearch
apiKey: "{{ kv('TAVILY_API_KEY') }}"
systemMessage: You are a helpful assistant that can answer questions about Kestra.
prompt: What is the latest release of Kestra?
Store chat memory as a Kestra KV pair
id: chat_with_memory
namespace: company.ai
inputs:
- id: first
type: STRING
defaults: Hello, my name is John and I'm from Paris
- id: second
type: STRING
defaults: What's my name and where do I live?
tasks:
- id: first
type: io.kestra.plugin.ai.rag.ChatCompletion
chatProvider:
type: io.kestra.plugin.ai.provider.GoogleGemini
embeddingProvider:
type: io.kestra.plugin.ai.provider.GoogleGemini
modelName: gemini-embedding-exp-03-07
embeddings:
type: io.kestra.plugin.ai.embeddings.KestraKVStore
memory:
type: io.kestra.plugin.ai.memory.KestraKVStore
ttl: PT1M
systemMessage: You are a helpful assistant, answer concisely
prompt: "{{inputs.first}}"
- id: second
type: io.kestra.plugin.ai.rag.ChatCompletion
chatProvider:
type: io.kestra.plugin.ai.provider.GoogleGemini
embeddingProvider:
type: io.kestra.plugin.ai.provider.GoogleGemini
modelName: gemini-embedding-exp-03-07
embeddings:
type: io.kestra.plugin.ai.embeddings.KestraKVStore
memory:
type: io.kestra.plugin.ai.memory.KestraKVStore
systemMessage: You are a helpful assistant, answer concisely
prompt: "{{inputs.second}}"
pluginDefaults:
- type: io.kestra.plugin.ai.provider.GoogleGemini
values:
apiKey: "{{ kv('GEMINI_API_KEY') }}"
modelName: gemini-2.5-flash
Classify recent Kestra releases into MINOR or PATCH using a JSON schema. Note: not all LLMs support structured outputs, or they may not support them when combined with tools like web search. This example uses Mistral, which supports structured output with content retrievers.
id: chat_with_structured_output
namespace: company.ai
tasks:
- id: categorize_releases
type: io.kestra.plugin.ai.rag.ChatCompletion
chatProvider:
type: io.kestra.plugin.ai.provider.MistralAI
apiKey: "{{ kv('MISTRAL_API_KEY') }}"
modelName: open-mistral-7b
contentRetrievers:
- type: io.kestra.plugin.ai.retriever.TavilyWebSearch
apiKey: "{{ kv('TAVILY_API_KEY') }}"
maxResults: 8
chatConfiguration:
responseFormat:
type: JSON
jsonSchema:
type: object
required: ["releases"]
properties:
releases:
type: array
minItems: 1
items:
type: object
additionalProperties: false
required: ["version", "date", "semver"]
properties:
version:
type: string
description: "Release tag, e.g., 0.24.0"
date:
type: string
description: "Release date"
semver:
type: string
enum: ["MINOR", "PATCH"]
summary:
type: string
description: "Short plain-text summary (optional)"
systemMessage: |
You are a release analyst. Use the Tavily web retriever to find recent Kestra releases.
Determine each release's SemVer category:
- MINOR: new features, no major breaking changes (y in x.Y.z)
- PATCH: bug fixes/patches only (z in x.y.Z)
Return ONLY valid JSON matching the schema. No prose, no extra keys.
prompt: |
Find most recent Kestra releases (within the last ~6 months).
Output their version, release date, semver category, and a one-line summary.
Properties
chatProvider *RequiredNon-dynamicAmazonBedrockAnthropicAzureOpenAIDeepSeekGoogleGeminiGoogleVertexAIMistralAIOllamaOpenAI
Chat model provider
prompt *Requiredstring
User prompt
The user input for this run. May be templated from flow inputs.
chatConfiguration Non-dynamicChatConfiguration
{}
Chat configuration
contentRetrieverConfiguration Non-dynamicChatCompletion-ContentRetrieverConfiguration
{
"maxResults": 3,
"minScore": 0
}
Content retriever configuration
contentRetrievers array
Additional content retrievers
Some content retrievers like WebSearch can also be used as tools, but using them as content retrievers will ensure that they are always called whereas tools are only used when the LLM decides to.
embeddingProvider Non-dynamicAmazonBedrockAnthropicAzureOpenAIDeepSeekGoogleGeminiGoogleVertexAIMistralAIOllamaOpenAI
Embedding model provider
Optional. If not set, the embedding model is created from chatProvider
. Ensure the chosen chat provider supports embeddings.
embeddings Non-dynamicChromaElasticsearchKestraKVStoreMilvusMongoDBAtlasPGVectorPineconeQdrantRedisWeaviate
Embedding store
Optional when at least one entry is provided in contentRetrievers
.
memory Non-dynamicKestraKVStoreRedis
Chat memory
Stores conversation history and injects it into context on subsequent runs.
systemMessage string
System message
Instruction that sets the assistant's role, tone, and constraints for this task.
tools Non-dynamicarray
Optional tools the LLM may call to augment its response
Outputs
finishReason string
STOP
LENGTH
TOOL_EXECUTION
CONTENT_FILTER
OTHER
Finish reason
jsonOutput object
LLM output for JSON
response format
The result of the LLM completion for response format of type JSON
, null otherwise.
outputFiles object
URIs of the generated files in Kestra's internal storage
requestDuration integer
Request duration in milliseconds
textOutput string
LLM output for TEXT
response format
The result of the LLM completion for response format of type TEXT
(default), null otherwise.
tokenUsage TokenUsage
Token usage
Definitions
PGVector Embedding Store
database *Requiredstring
The database name
host *Requiredstring
The database server host
password *Requiredstring
The database password
port *Requiredintegerstring
The database server port
table *Requiredstring
The table to store embeddings in
type *Requiredobject
user *Requiredstring
The database user
useIndex booleanstring
false
Whether to use use an IVFFlat index
An IVFFlat index divides vectors into lists, and then searches a subset of those lists closest to the query vector. It has faster build times and uses less memory than HNSW but has lower query performance (in terms of speed-recall tradeoff).
MongoDB Atlas Embedding Store
collectionName *Requiredstring
The collection name
host *Requiredstring
The host
indexName *Requiredstring
The index name
scheme *Requiredstring
The scheme (e.g., mongodb+srv)
type *Requiredobject
createIndex booleanstring
Create the index
database string
The database
metadataFieldNames array
The metadata field names
options object
The connection string options
password string
The password
username string
The username
Mistral AI Model Provider
apiKey *Requiredstring
API Key
modelName *Requiredstring
Model name
type *Requiredobject
baseUrl string
API base URL
Model Context Protocol (MCP) Stdio client tool
command *Requiredarray
MCP client command, as a list of command parts
type *Requiredobject
env object
Environment variables
logEvents booleanstring
false
Log events
Chroma Embedding Store
baseUrl *Requiredstring
The database base URL
collectionName *Requiredstring
The collection name
type *Requiredobject
Redis Embedding Store
host *Requiredstring
The database server host
port *Requiredintegerstring
The database server port
type *Requiredobject
indexName string
embedding-index
The index name
Call a Kestra flow as a tool
type *Requiredobject
description string
Description of the flow if not already provided inside the flow itself
Use it only if you define the flow in the tool definition. The LLM needs a tool description to identify whether to call it. If the flow has a description, the tool will use it. Otherwise, the description property must be explicitly defined.
flowId string
Flow ID of the flow that should be called
inheritLabels booleanstring
false
Whether the flow should inherit labels from this execution that triggered it
By default, labels are not inherited. If you set this option to true
, the flow execution will inherit all labels from the agent's execution.
Any labels passed by the LLM will override those defined here.
inputs object
Input values that should be passed to flow's execution
Any inputs passed by the LLM will override those defined here.
labels arrayobject
Labels that should be added to the flow's execution
Any labels passed by the LLM will override those defined here.
namespace string
Namespace of the flow that should be called
revision integerstring
Revision of the flow that should be called
scheduleDate string
date-time
Schedule the flow execution at a later date
If the LLM sets a scheduleDate, it will override the one defined here.
io.kestra.plugin.ai.embeddings.Elasticsearch-ElasticsearchConnection-BasicAuth
password string
Basic authorization password
username string
Basic authorization username
Model Context Protocol (MCP) SSE client tool
type *Requiredobject
url *Requiredstring
URL of the MCP server
headers object
Custom headers
Useful, for example, for adding authentication tokens via the Authorization
header.
logRequests booleanstring
false
Log requests
logResponses booleanstring
false
Log responses
timeout string
duration
Connection timeout duration
Call a Kestra runnable task as a tool
io.kestra.plugin.ai.domain.AIOutput-ToolExecution
requestArguments object
requestId string
requestName string
result string
Deepseek Model Provider
apiKey *Requiredstring
API Key
modelName *Requiredstring
Model name
type *Requiredobject
baseUrl string
https://api.deepseek.com/v1
API base URL
Pinecone Embedding Store
apiKey *Requiredstring
The API key
cloud *Requiredstring
The cloud provider
index *Requiredstring
The index
region *Requiredstring
The cloud provider region
type *Requiredobject
namespace string
The namespace (default will be used if not provided)
io.kestra.plugin.ai.domain.AIOutput-AIResponse
completion string
Generated text completion
The result of the text completion
finishReason string
STOP
LENGTH
TOOL_EXECUTION
CONTENT_FILTER
OTHER
Finish reason
id string
Response identifier
requestDuration integer
Request duration in milliseconds
tokenUsage TokenUsage
Token usage
io.kestra.plugin.ai.domain.ChatConfiguration-ResponseFormat
jsonSchema object
JSON Schema (used when type = JSON)
Provide a JSON Schema describing the expected structure of the response. In Kestra flows, define the schema in YAML (it is still a JSON Schema object). Example (YAML):
responseFormat:
type: JSON
jsonSchema:
type: object
required: ["category", "priority"]
properties:
category:
type: string
enum: ["ACCOUNT", "BILLING", "TECHNICAL", "GENERAL"]
priority:
type: string
enum: ["LOW", "MEDIUM", "HIGH"]
Note: Provider support for strict schema enforcement varies. If unsupported, guide the model about the expected output structure via the prompt and validate downstream.
jsonSchemaDescription string
Schema description (optional)
Natural-language description of the schema to help the model produce the right fields. Example: "Classify a customer ticket into category and priority."
type string
TEXT
TEXT
JSON
Response format type
Specifies how the LLM should return output. Allowed values:
- TEXT (default): free-form natural language.
- JSON: structured output validated against a JSON Schema.
Model Context Protocol (MCP) Docker client tool
image *Requiredstring
Container image
type *Requiredobject
apiVersion string
API version
binds array
Volume binds
command array
MCP client command, as a list of command parts
dockerCertPath string
Docker certificate path
dockerConfig string
Docker configuration
dockerContext string
Docker context
dockerHost string
Docker host
dockerTlsVerify booleanstring
Whether Docker should verify TLS certificates
env object
Environment variables
logEvents booleanstring
false
Whether to log events
registryEmail string
Container registry email
registryPassword string
Container registry password
registryUrl string
Container registry URL
registryUsername string
Container registry username
Google Custom Search web tool
apiKey *Requiredstring
API key
csi *Requiredstring
Custom search engine ID (cx)
type *Requiredobject
Ollama Model Provider
endpoint *Requiredstring
Model endpoint
modelName *Requiredstring
Model name
type *Requiredobject
Code execution tool using Judge0
apiKey *Requiredstring
RapidAPI key for Judge0
You can obtain it from the RapidAPI website.
type *Requiredobject
OpenAI Model Provider
apiKey *Requiredstring
API Key
modelName *Requiredstring
Model name
type *Requiredobject
baseUrl string
API base URL
Web search content retriever for Google Custom Search
apiKey *Requiredstring
API key
csi *Requiredstring
Custom search engine ID (cx)
type *Requiredobject
maxResults integerstring
3
Maximum number of results
io.kestra.plugin.ai.domain.ChatConfiguration
logRequests booleanstring
Log LLM requests
If true, prompts and configuration sent to the LLM will be logged at INFO level.
logResponses booleanstring
Log LLM responses
If true, raw responses from the LLM will be logged at INFO level.
responseFormat ChatConfiguration-ResponseFormat
Response format
Defines the expected output format. Default is plain text.
Some providers allow requesting JSON or schema-constrained outputs, but support varies and may be incompatible with tool use.
When using a JSON schema, the output will be returned under the key jsonOutput
.
seed integerstring
Seed
Optional random seed for reproducibility. Provide a positive integer (e.g., 42, 1234). Using the same seed with identical settings produces repeatable outputs.
temperature numberstring
Temperature
Controls randomness in generation. Typical range is 0.0–1.0. Lower values (e.g., 0.2) make outputs more focused and deterministic, while higher values (e.g., 0.7–1.0) increase creativity and variability.
topK integerstring
Top-K
Limits sampling to the top K most likely tokens at each step. Typical values are between 20 and 100. Smaller values reduce randomness; larger values allow more diverse outputs.
topP numberstring
Top-P (nucleus sampling)
Selects from the smallest set of tokens whose cumulative probability is ≤ topP. Typical values are 0.8–0.95. Lower values make the output more focused, higher values increase diversity.
io.kestra.plugin.ai.domain.TokenUsage
inputTokenCount integer
outputTokenCount integer
totalTokenCount integer
Elasticsearch Embedding Store
connection *RequiredElasticsearch-ElasticsearchConnection
indexName *Requiredstring
The name of the index to store embeddings
type *Requiredobject
io.kestra.plugin.ai.rag.ChatCompletion-ContentRetrieverConfiguration
maxResults integer
3
Maximum results to return from the embedding store
minScore number
0
Minimum similarity score (0-1 inclusive). Only results with score ≥ minScore are returned.
io.kestra.plugin.ai.domain.AIOutput-AIResponse-ToolExecutionRequest
arguments object
Tool request arguments
id string
Tool execution request identifier
name string
Tool name
Azure OpenAI Model Provider
endpoint *Requiredstring
API endpoint
The Azure OpenAI endpoint in the format: https://{resource}.openai.azure.com/
modelName *Requiredstring
Model name
type *Requiredobject
apiKey string
API Key
clientId string
Client ID
clientSecret string
Client secret
serviceVersion string
API version
tenantId string
Tenant ID
Qdrant Embedding Store
apiKey *Requiredstring
The API key
collectionName *Requiredstring
The collection name
host *Requiredstring
The database server host
port *Requiredintegerstring
The database server port
type *Requiredobject
Google VertexAI Model Provider
endpoint *Requiredstring
Endpoint URL
location *Requiredstring
Project location
modelName *Requiredstring
Model name
project *Requiredstring
Project ID
type *Requiredobject
Google Gemini Model Provider
apiKey *Requiredstring
API Key
modelName *Requiredstring
Model name
type *Requiredobject
In-memory embedding store that stores data as Kestra KV pairs
type *Requiredobject
kvName string
{{flow.id}}-embedding-store
The name of the KV pair to use
Model Context Protocol (MCP) SSE client tool
sseUrl *Requiredstring
SSE URL of the MCP server
type *Requiredobject
headers object
Custom headers
Could be useful, for example, to add authentication tokens via the Authorization
header.
logRequests booleanstring
false
Log requests
logResponses booleanstring
false
Log responses
timeout string
duration
Connection timeout duration
WebSearch content retriever for Tavily Search
apiKey *Requiredstring
API Key
type *Requiredobject
maxResults integerstring
3
Maximum number of results to return
Chat Memory backed by Redis
host *Requiredstring
Redis host
The hostname of your Redis server (e.g., localhost or redis-server)
type *Requiredobject
drop string
NEVER
NEVER
BEFORE_TASKRUN
AFTER_TASKRUN
Drop memory: never, before, or after the agent's task run
By default, the memory ID is the value of the system.correlationId
label, meaning that the same memory will be used by all tasks of the flow and its subflows.
If you want to remove the memory eagerly (before expiration), you can set drop: AFTER_TASKRUN
to erase the memory after the taskrun.
You can also set drop: BEFORE_TASKRUN
to drop the memory before the taskrun.
memoryId string
{{ labels.system.correlationId }}
Memory ID - defaults to the value of the system.correlationId
label. This means that a memory is valid for the entire flow execution including its subflows.
messages integerstring
10
Maximum number of messages to keep in memory. If memory is full, the oldest messages will be removed in a FIFO manner. The last system message is always kept.
port integerstring
6379
Redis port
The port of your Redis server
ttl string
PT1H
duration
Memory duration - defaults to 1h
Milvus Embedding Store
token *Requiredstring
Token
Milvus auth token. Required if authentication is enabled; omit for local deployments without auth.
type *Requiredobject
autoFlushOnDelete booleanstring
Auto flush on delete
If true, flush after delete operations.
autoFlushOnInsert booleanstring
Auto flush on insert
If true, flush after insert operations. Setting it to false can improve throughput.
collectionName string
Collection name
Target collection. Created automatically if it does not exist. Default: "default".
consistencyLevel string
Consistency level
Read/write consistency level. Common values include STRONG, BOUNDED, or EVENTUALLY (depends on client/version).
databaseName string
Database name
Logical database to use. If not provided, the default database is used.
host string
Host
Milvus host name (used when uri
is not set). Default: "localhost".
idFieldName string
ID field name
Field name for document IDs. Default depends on collection schema.
indexType string
Index type
Vector index type (e.g., IVF_FLAT, IVF_SQ8, HNSW). Depends on Milvus deployment and dataset.
metadataFieldName string
Metadata field name
Field name for metadata. Default depends on collection schema.
metricType string
Metric type
Similarity metric (e.g., L2, IP, COSINE). Should match the embedding provider’s expected metric.
password string
Password
Required when authentication/TLS is enabled. See https://milvus.io/docs/authenticate.md
port integerstring
Port
Milvus port (used when uri
is not set). Typical: 19530 (gRPC) or 9091 (HTTP). Default: 19530.
retrieveEmbeddingsOnSearch booleanstring
Retrieve embeddings on search
If true, return stored embeddings along with matches. Default: false.
textFieldName string
Text field name
Field name for original text. Default depends on collection schema.
uri string
URI
Connection URI. Use either uri
OR host
/port
(not both).
Examples:
- gRPC (typical): "milvus://host: 19530"
- HTTP: "http://host: 9091"
username string
Username
Required when authentication/TLS is enabled. See https://milvus.io/docs/authenticate.md
vectorFieldName string
Vector field name
Field name for the embedding vector. Must match the index definition and embedding dimensionality.
Anthropic AI Model Provider
apiKey *Requiredstring
API Key
modelName *Requiredstring
Model name
type *Requiredobject
WebSearch tool for Tavily Search
apiKey *Requiredstring
Tavily API Key - you can obtain one from the Tavily website
type *Requiredobject
In-memory Chat Memory that stores its data as Kestra KV pairs
type *Requiredobject
drop string
NEVER
NEVER
BEFORE_TASKRUN
AFTER_TASKRUN
Drop memory: never, before, or after the agent's task run
By default, the memory ID is the value of the system.correlationId
label, meaning that the same memory will be used by all tasks of the flow and its subflows.
If you want to remove the memory eagerly (before expiration), you can set drop: AFTER_TASKRUN
to erase the memory after the taskrun.
You can also set drop: BEFORE_TASKRUN
to drop the memory before the taskrun.
memoryId string
{{ labels.system.correlationId }}
Memory ID - defaults to the value of the system.correlationId
label. This means that a memory is valid for the entire flow execution including its subflows.
messages integerstring
10
Maximum number of messages to keep in memory. If memory is full, the oldest messages will be removed in a FIFO manner. The last system message is always kept.
ttl string
PT1H
duration
Memory duration - defaults to 1h
Weaviate Embedding Store
apiKey *Requiredstring
API key
Weaviate API key. Omit for local deployments without auth.
host *Requiredstring
Host
Cluster host name without protocol, e.g., "abc123.weaviate.network".
type *Requiredobject
avoidDups booleanstring
Avoid duplicates
If true (default), a hash-based ID is derived from each text segment to prevent duplicates. If false, a random ID is used.
consistencyLevel string
ONE
QUORUM
ALL
Consistency level
Write consistency: ONE, QUORUM (default), or ALL.
grpcPort integerstring
gRPC port
Port for gRPC if enabled (e.g., 50051).
metadataFieldName string
Metadata field name
Field used to store metadata. Defaults to "_metadata" if not set.
metadataKeys array
Metadata keys
The list of metadata keys to store - if not provided, it will default to an empty list.
objectClass string
Object class
Weaviate class to store objects in (must start with an uppercase letter). Defaults to "Default" if not set.
port integerstring
Port
Optional port (e.g., 443 for https, 80 for http). Leave unset to use provider defaults.
scheme string
Scheme
Cluster scheme: "https" (recommended) or "http".
securedGrpc booleanstring
Secure gRPC
Whether the gRPC connection is secured (TLS).
useGrpcForInserts booleanstring
Use gRPC for batch inserts
If true, use gRPC for batch inserts. HTTP remains required for search operations.
io.kestra.plugin.ai.embeddings.Elasticsearch-ElasticsearchConnection
hosts *Requiredarray
1
List of HTTP Elasticsearch servers
Must be a URI like https://example.com: 9200
with scheme and port
basicAuth Elasticsearch-ElasticsearchConnection-BasicAuth
Basic authorization configuration
headers array
List of HTTP headers to be sent with every request
Each item is a key: value
string, e.g., Authorization: Token XYZ
pathPrefix string
Path prefix for all HTTP requests
If set to /my/path
, each client request becomes /my/path/
+ endpoint. Useful when Elasticsearch is behind a proxy providing a base path; do not use otherwise.
strictDeprecationMode booleanstring
Treat responses with deprecation warnings as failures
trustAllSsl booleanstring
Trust all SSL CA certificates
Use this if the server uses a self-signed SSL certificate
Amazon Bedrock Model Provider
accessKeyId *Requiredstring
AWS Access Key ID
modelName *Requiredstring
Model name
secretAccessKey *Requiredstring
AWS Secret Access Key
type *Requiredobject
modelType string
COHERE
COHERE
TITAN
Amazon Bedrock Embedding Model Type