CaddisFly: Pipeline Orchestration for AI Agents
AI agents are great at reasoning, but at some point they need to actually do things — build code, run tests, deploy services, fetch data. CaddisFly is the pipeline orchestration engine we built for OpenCaddis: a plugin that lets agents define and execute multi-step command pipelines with parallel execution, approval gates, retry logic, and a safety system that keeps them from doing anything destructive.
In this post, I'll walk through CaddisFly from the ground up — its origins as a derivative of the OpenClaw Lobster plugin, the architecture we built, the inline DSL and YAML workflow systems, the safety validator, and the approval gate mechanism that gives humans a checkpoint in automated pipelines.
Origins: From OpenClaw Lobster to CaddisFly
CaddisFly didn't start from scratch. Its lineage traces back to the Lobster plugin in the OpenClaw project — an earlier pipeline execution system designed for command orchestration in agent environments. Lobster established several ideas that carried forward: pipe-delimited command chaining, output piping between steps, and the concept of an agent-accessible tool that wraps subprocess execution with safety controls.
When we started building OpenCaddis on the FabrCore framework, we needed pipeline orchestration that fit the FabrCore plugin model — tools exposed to the LLM via IFabrPlugin, configuration through agent args, and integration with FabrCore's messaging system for progress reporting. Rather than port Lobster wholesale, we rebuilt the pipeline engine using Lobster's core design principles as the foundation, adapting and extending them for the OpenCaddis architecture.
CaddisFly is a derivative of the OpenClaw Lobster plugin. The pipe-delimited DSL syntax, the step-by-step execution model, the subprocess isolation pattern, and the safety blocklist concept all originated in Lobster. CaddisFly builds on these foundations with YAML workflows, parallel execution, approval gates, variable substitution, run state persistence, and FabrCore plugin integration.
What CaddisFly added beyond Lobster:
- YAML workflow files — reusable, version-controlled pipeline definitions with variable substitution
- Parallel execution groups — run multiple steps concurrently with
Task.WhenAll - Approval gates — pause pipelines for human review before continuing
- Run state persistence — paused runs survive container restarts via file-based storage
- Progress reporting — real-time step-by-step updates through FabrCore's thinking notification system
- Retry logic with delay — configurable retries per step with delay between attempts
Architecture: Seven Components, One Plugin
CaddisFly is split across seven components, each with a focused responsibility:
| Component | File | Role |
|---|---|---|
| CaddisFlyPlugin | Plugins/CaddisFlyPlugin.cs |
FabrCore plugin entry point — exposes 7 tools to the agent, handles configuration, wires dependencies |
| PipelineParser | CaddisFly/PipelineParser.cs |
Parses inline DSL strings into CaddisFlyPipeline objects with tokenization and variable substitution |
| WorkflowFileLoader | CaddisFly/WorkflowFileLoader.cs |
Loads YAML workflow files from disk, converts to pipeline objects, applies variable overrides |
| CommandExecutor | CaddisFly/CommandExecutor.cs |
Executes individual steps via subprocess or built-in handlers with timeout enforcement |
| CommandSafetyValidator | CaddisFly/CommandSafetyValidator.cs |
Blocklist-based safety checks — rejects destructive, eval-based, and system-path commands |
| CaddisFlyRuntimeService | Services/CaddisFlyRuntimeService.cs |
Orchestrates execution loops, approval gate pausing/resuming, retry logic, parallel group dispatch |
| CaddisFlyRunStore | Services/CaddisFlyRunStore.cs |
File-based persistence for run state and step logs, 24-hour retention cleanup |
The data flows through the system like this:
The Inline DSL: Pipe-Delimited Pipelines
The simplest way to use CaddisFly is through the inline DSL — a pipe-delimited string where each segment is a step. The agent calls RunPipeline with a string, and the PipelineParser turns it into a structured CaddisFlyPipeline object.
RunPipeline "dotnet restore | dotnet build -c Release | dotnet test -c Release"
The parser handles this through a two-phase process: first splitting on unquoted pipe characters, then tokenizing each segment with full quote awareness:
private static List<string> SplitPipeSegments(string input)
{
var segments = new List<string>();
var current = new StringBuilder();
var inSingle = false;
var inDouble = false;
for (var i = 0; i < input.Length; i++)
{
var c = input[i];
if (c == '\'' && !inDouble) inSingle = !inSingle;
else if (c == '"' && !inSingle) inDouble = !inDouble;
else if (c == '|' && !inSingle && !inDouble)
{
segments.Add(current.ToString());
current.Clear();
}
else current.Append(c);
}
segments.Add(current.ToString());
return segments;
}
This character-by-character approach is deliberate. A simple string.Split('|') would break on pipe characters inside quoted arguments — like bash --command 'echo hello | grep hello'. The parser tracks quote state so pipes inside strings are preserved.
Each segment is then tokenized using a generated regex that respects both single and double quotes, with quotes stripped from the returned values:
[GeneratedRegex("""\"(?<dq>[^\"]*)\"|'(?<sq>[^']*)'|[^\s]+""")]
private static partial Regex TokenRegex();
The tokenizer produces a flat list of tokens, and the parser walks through them extracting --key value pairs into a dictionary, along with special flags like --timeout, --retries, and --retry-delay. The result is a PipelineStep with everything the executor needs:
public sealed class PipelineStep
{
public required string Name { get; init; }
public required string Command { get; init; }
public Dictionary<string, string> Args { get; init; } = [];
public int? TimeoutSeconds { get; init; }
public int Retries { get; init; }
public int RetryDelaySeconds { get; init; } = 2;
public string? ApprovalPrompt { get; init; }
public List<PipelineStep>? ParallelSteps { get; init; }
public bool IsParallelGroup => ParallelSteps is { Count: > 0 };
}
YAML Workflows: Reusable Pipeline Definitions
For pipelines that get reused — build-and-test, deploy, health checks — the inline DSL gets unwieldy. YAML workflow files let you define pipelines as structured documents with names, descriptions, variables, and step definitions:
name: deploy
description: Build, test, approve, and deploy
variables:
config: Release
target: production
steps:
- name: Restore
command: dotnet
args:
command: restore
- name: Build
command: dotnet
args:
command: build -c {{config}}
- name: Test
command: dotnet
args:
command: test -c {{config}}
retries: 2
- name: Approve
command: approve
approval_prompt: Deploy to {{target}}?
- name: Deploy
command: dotnet
args:
command: publish -c {{config}} -o ./publish
The WorkflowFileLoader handles loading and conversion. It uses YamlDotNet for deserialization, converts the YAML structure into a CaddisFlyPipeline, and applies variable substitution using the same PipelineParser.ApplyVariables method that the DSL uses. Variables defined in the YAML serve as defaults; runtime overrides passed via the RunWorkflow tool take precedence.
public CaddisFlyPipeline Load(
string workflowName,
Dictionary<string, string>? variables = null)
{
var filePath = ResolveWorkflowPath(workflowName);
var yaml = File.ReadAllText(filePath);
var def = YamlDeserializer.Deserialize<WorkflowDefinition>(yaml);
var pipeline = ConvertToPipeline(def);
// Apply variables from the file, then runtime overrides
PipelineParser.ApplyVariables(pipeline, variables);
return pipeline;
}
Workflow resolution is flexible — you can reference a workflow by filename (with or without the .yaml extension) or by the name field inside the YAML file itself. The loader searches the configured workflow directory and falls back to name-field matching if the filename doesn't match directly.
Variables use {{name}} syntax and are replaced in all step args. The regex \{\{(\w+)\}\} handles the substitution. Unresolved variables are left as-is — no errors, no silent failures. This lets you define optional variables that only get substituted when provided.
Command Execution: Subprocess Isolation
The CommandExecutor is where pipelines meet the operating system. It handles three categories of steps: built-in commands, shell-style executables, and direct executables.
Built-in Commands
Three commands run inside the engine without spawning a subprocess:
| Command | Purpose |
|---|---|
echo | Output a message — useful for logging and debugging |
set-var | Set a pipeline variable for subsequent steps |
approve | Pause for human approval (handled by the runtime, not the executor) |
Shell-Style Executables
Commands like bash, python3, node, and pwsh receive their arguments via -c "command". PowerShell gets special treatment with -NoProfile -NonInteractive flags:
if (executable == "pwsh")
{
psi.FileName = "pwsh";
psi.Arguments = $"-NoProfile -NonInteractive -Command \"{escaped}\"";
}
else if (ShellStyleExecutables.Contains(executable))
{
// bash, python3, python, node — use -c "command"
psi.FileName = executable;
psi.Arguments = $"-c \"{commandStr}\"";
}
else
{
// docker, curl, git, dotnet, npm — args passed directly
psi.FileName = executable;
psi.Arguments = commandStr;
}
Direct Executables
Commands like dotnet, git, curl, docker, and npm receive arguments directly — no shell wrapper. The executor maintains a command registry that maps logical names to executables, and custom commands can be added via configuration:
_commands = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
{
["powershell"] = "pwsh",
["bash"] = "bash",
["docker"] = "docker",
["python"] = "python3",
["node"] = "node",
["curl"] = "curl",
["git"] = "git",
["dotnet"] = "dotnet",
["npm"] = "npm",
};
// Custom commands merge in (overrides allowed)
if (customCommands is not null)
foreach (var (key, value) in customCommands)
_commands[key] = value;
Output Piping
Each step receives the previous step's stdout as stdin. The executor writes the previous output to the process's standard input before closing it, creating a true pipe-like behavior between steps. Output is captured asynchronously via OutputDataReceived and ErrorDataReceived events, and truncated at the configurable MaxOutputLength (default 10,000 characters) to prevent token bloat when the agent receives the result.
Timeout Enforcement
Each step has a timeout (configurable per-step or via the global default). The executor uses CancellationTokenSource.CreateLinkedTokenSource to combine the step timeout with any external cancellation, and kills the entire process tree on timeout:
using var timeoutCts = new CancellationTokenSource(
TimeSpan.FromSeconds(timeoutSeconds));
using var linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);
try
{
await process.WaitForExitAsync(linkedCts.Token);
}
catch (OperationCanceledException)
when (timeoutCts.IsCancellationRequested)
{
try { process.Kill(entireProcessTree: true); }
catch { /* best-effort */ }
}
The Safety Validator: What Agents Can't Do
Giving an AI agent access to subprocess execution is powerful and dangerous. The CommandSafetyValidator is the guardrail — a pattern-matching blocklist that rejects commands before they ever reach a process.
Every command string is normalized to lowercase and checked against blocked patterns:
public static string? CheckCommand(string input)
{
var normalized = input.Trim().ToLowerInvariant();
foreach (var pattern in BlockedPatterns)
{
if (normalized.Contains(pattern))
return $"Blocked: Command contains disallowed pattern '{pattern}'.";
}
// ... additional checks for system paths, registry, critical services
return null;
}
The blocklist covers five categories of dangerous operations:
| Category | Examples | Why Blocked |
|---|---|---|
| Destructive commands | rm -rf /, format-volume, dd if=, shutdown |
Irreversible data loss or system damage |
| Eval / dynamic execution | invoke-expression, iex, start-process powershell |
Bypass the safety validator by spawning a child process |
| Language-specific exploits | os.system("rm"), require('child_process') |
Escape subprocess isolation through language runtimes |
| Curl piped to shell | curl | bash, wget | sh |
Remote code execution via download-and-execute |
| Git force operations | push --force origin main, push -f origin master |
Destructive history rewriting on protected branches |
Beyond the pattern blocklist, the validator has special-case checks for recursive deletion on system paths (c:\windows, /usr/bin, /etc), registry modification via HKLM:, and stopping critical Windows services (lsass, csrss, svchost).
Commands registered via the CustomCommands configuration still pass through the safety validator. You can add terraform or kubectl as custom commands, but the arguments they receive are still checked against the blocklist. You cannot bypass safety by wrapping a dangerous command in a custom alias.
The Runtime: Execution, Retries, and Approval Gates
The CaddisFlyRuntimeService is the orchestrator. It takes a parsed pipeline and walks through the steps sequentially, handling three special cases: approval gates, parallel groups, and retries.
Approval Gates
When the runtime encounters an approve step, it pauses execution and returns a NeedsApproval status with a resume token. The pipeline state is persisted to disk so it survives container restarts:
if (step.Command.Equals("approve", StringComparison.OrdinalIgnoreCase))
{
state.ApprovalPrompt = step.ApprovalPrompt
?? "Approval required to continue.";
state.Status = CaddisFlyStatus.NeedsApproval;
state.NextStepIndex++;
await _store.SaveAsync(state);
return BuildEnvelope(state);
}
On the plugin side, approval is even more interesting. The CaddisFlyPlugin registers a TaskCompletionSource with the runtime and sends a special caddisfly-approval message to the client. The tool call blocks — the LLM doesn't get a response — until the user approves or denies in the UI, or 10 minutes pass:
// Register waiter BEFORE sending card to avoid a race
var tcs = _runtime.RegisterApprovalWaiter(envelope.RunId);
await _host.SendMessage(new AgentMessage
{
ToHandle = clientHandle,
MessageType = "caddisfly-approval",
Message = envelope.ApprovalPrompt,
Args = new Dictionary<string, string>
{
["resumeToken"] = envelope.ResumeToken,
// ... step progress info
}
});
// Block the tool call until user responds (10-minute timeout)
envelope = await tcs.Task.WaitAsync(TimeSpan.FromMinutes(10));
The approval waiter is registered before the message is sent to prevent a race condition where the user approves before the waiter exists. This is the kind of subtle concurrency bug that only appears under real load.
For pipelines with multiple approval gates, the plugin loops — after each approval resolves, it checks if the new status is another NeedsApproval and repeats the process. A single RunPipeline tool call can block through multiple approval gates without returning to the LLM in between.
Parallel Execution
Parallel groups use Task.WhenAll to execute multiple steps concurrently. Each step in the group receives the same previous output (not each other's output), and the pipeline waits for all to complete:
private static async Task<List<CaddisFlyStepResult>>
ExecuteParallelGroupAsync(
PipelineStep group,
string previousOutput,
CommandExecutor executor,
CancellationToken ct)
{
var tasks = group.ParallelSteps!.Select(step =>
ExecuteWithRetryAsync(step, previousOutput, executor, ct));
var results = await Task.WhenAll(tasks);
return [.. results];
}
If any step in a parallel group fails, the entire group is marked as failed and the pipeline stops. Outputs from successful parallel steps are merged with a \n---\n separator and passed to the next sequential step.
There's a safety constraint: approval commands are not allowed inside parallel groups. The WorkflowFileLoader validates this at parse time and throws a FormatException — pausing one branch of a parallel group while others continue would create unpredictable state.
Retry Logic
Steps can specify a retry count and delay. The runtime runs up to 1 + Retries attempts, with the configurable delay between each. Each attempt is tagged with its attempt number in the result, and the progress reporter notifies the user of retries in real time:
for (var attempt = 1; attempt <= maxAttempts; attempt++)
{
result = await executor.ExecuteStepAsync(
step, state.LastOutput, linkedCts.Token);
result = result with { Attempt = attempt };
if (stepSucceeded) break;
if (attempt < maxAttempts)
{
await reporter.ReportStepRetryingAsync(
stepIndex, totalSteps, step.Name,
attempt + 1, maxAttempts);
await Task.Delay(
TimeSpan.FromSeconds(step.RetryDelaySeconds),
linkedCts.Token);
}
}
Progress Reporting: Thinking Notifications
Pipeline execution can take minutes — builds compile, tests run, API calls wait for responses. Without progress feedback, the user sees nothing while the LLM tool call blocks.
CaddisFly uses an ICaddisFlyProgressReporter interface with three hooks — step starting, step completed, and step retrying. The default implementation, ThinkingProgressReporter, sends real-time updates through FabrCore's thinking notification system:
public async Task ReportStepStartingAsync(
int stepIndex, int totalSteps, string stepName)
{
await ThinkingNotifier.SendThinkingAsync(_host,
$"[{stepIndex + 1}/{totalSteps}] Running: {stepName}...");
}
public async Task ReportStepCompletedAsync(
int stepIndex, int totalSteps, string stepName, bool success)
{
var status = success ? "completed" : "failed";
await ThinkingNotifier.SendThinkingAsync(_host,
$"[{stepIndex + 1}/{totalSteps}] Step {stepName} {status}");
}
In the UI, the user sees a live progress stream:
Run the deploy workflow for staging.
[1/5] Running: Restore...
[1/5] Step Restore completed
[2/5] Running: Build...
[2/5] Step Build completed
[3/5] Running: Test...
Deploy to staging? Approve Deny
The Response Envelope
Every CaddisFly operation returns a CaddisFlyEnvelope — a structured JSON response that gives the agent everything it needs to report results or take next steps:
public sealed class CaddisFlyEnvelope
{
public required string RunId { get; init; }
public CaddisFlyStatus Status { get; set; }
public string Output { get; set; } = "";
public string? Error { get; set; }
public string? ResumeToken { get; set; }
public string? ApprovalPrompt { get; set; }
public List<CaddisFlyStepResult> Steps { get; init; } = [];
public double TotalDurationMs { get; set; }
}
The five statuses tell the agent exactly what happened:
| Status | Meaning | Agent Action |
|---|---|---|
Ok | All steps completed successfully | Report the output to the user |
NeedsApproval | Paused at an approval gate | Present the approval prompt; call ResumeRun when resolved |
Cancelled | User cancelled or denied approval | Inform the user the pipeline was cancelled |
Error | A step failed after all retries | Report the error; suggest fixes |
TimedOut | A step exceeded its timeout | Report the timeout; suggest increasing the limit |
Each step result includes the step name, command, exit code, stdout output, stderr error, duration in milliseconds, and attempt number. The agent can read these to provide detailed feedback — "Step 3 (Test) failed on attempt 2 after 45 seconds with exit code 1" rather than a generic "pipeline failed."
Run Persistence and History
CaddisFly persists run state and step logs to disk via the CaddisFlyRunStore. This serves two purposes: paused runs at approval gates survive container restarts, and completed runs provide an audit trail.
The store writes JSON state files and per-step log files under a runs subdirectory:
{WorkingDirectory}/runs/
abc123def456.json # Run state (pipeline, status, completed steps)
abc123def456/step-000-Restore.log # Step 1 output
abc123def456/step-001-Build.log # Step 2 output
abc123def456/step-002-Test.log # Step 3 output
At startup, the runtime calls RestorePausedRunsAsync() to load any runs that were paused at approval gates when the container last stopped. It also runs a cleanup pass to remove runs older than 24 hours — a simple retention policy that prevents disk growth without requiring manual cleanup.
The agent can access run history through the GetRunStatus and GetRunLogs tools. GetRunLogs reads the per-step log files and returns them in a formatted output that's easy for the LLM to parse and summarize for the user.
Configuration
CaddisFly is configured through the agent's plugin args in opencaddis.json:
| Setting | Default | Description |
|---|---|---|
CaddisFly:WorkingDirectory | {TempPath}/OpenCaddis/ | Base directory for command execution and run logs |
CaddisFly:WorkflowPath | {TempPath}/OpenCaddis/workflows/ | Directory containing YAML workflow files |
CaddisFly:TimeoutSeconds | 60 | Default timeout per step (overridable per-step) |
CaddisFly:MaxOutputLength | 10000 | Maximum characters captured per step output |
CaddisFly:CustomCommands | (empty) | Semicolon-separated list of custom executables (e.g. terraform=terraform;kubectl=kubectl) |
{
"Agents": [
{
"Handle": "devops",
"Plugins": ["CaddisFly", "FileSystem"],
"Args": {
"CaddisFly:WorkingDirectory": "C:\\Projects\\MyApp",
"CaddisFly:WorkflowPath": "C:\\Projects\\MyApp\\workflows",
"CaddisFly:TimeoutSeconds": "120",
"CaddisFly:MaxOutputLength": "20000",
"CaddisFly:CustomCommands": "terraform=terraform;kubectl=kubectl;az=az"
}
}
]
}
When running in Docker, point WorkingDirectory and WorkflowPath to a mounted volume (e.g. /working/caddisfly/) so workflows, run logs, and paused state persist across container restarts. See the Docker setup guide for the full volume mapping.
Limits and Guardrails
Beyond the safety validator, CaddisFly enforces operational limits to keep things predictable:
| Limit | Value |
|---|---|
| Max steps per pipeline | 50 |
| Max parallel steps per group | 10 |
| Max output per step | 10,000 chars (configurable) |
| Default timeout per step | 60 seconds (configurable) |
| Max retries per step | 5 |
| Max concurrent runs per agent | 5 |
| Approval gate timeout | 10 minutes |
| Run retention | 24 hours |
Looking Ahead
CaddisFly today handles the core pipeline orchestration needs — sequential execution, parallel groups, approval gates, retries, and safety controls. There are areas we're exploring: conditional step execution based on previous step output, pipeline composition (calling one workflow from another), webhook-triggered pipelines, and richer output handling that lets steps produce structured data instead of just text.
The foundation that started with OpenClaw's Lobster has grown into a full pipeline engine that fits naturally into the FabrCore plugin model. It's the tool that lets agents move from "I think you should build and deploy this" to "I'll build and deploy it, and I'll ask your permission before the deploy goes live."
Check out the CaddisFly documentation for the full reference, and the OpenCaddis source on GitHub to see the complete implementation.
Builder of OpenCaddis and the FabrCore framework.