Custom MCP Client Integration
This guide shows you how to build a custom MCP (Model Context Protocol) client that connects to the Orchex MCP server. This is useful if you're building your own AI-powered tools or integrating Orchex into custom workflows.
Overview
The Model Context Protocol (MCP) is an open protocol that standardizes how AI applications provide context to large language models. Orchex implements an MCP server that exposes its manifest execution capabilities through this protocol.
What You'll Build
A custom client that:
- Connects to the Orchex MCP server
- Lists available tools (orchestration operations)
- Executes orchestrations programmatically
- Handles streaming responses and artifacts
Prerequisites
- Node.js 18+ or Python 3.10+
- Orchex installed (
npm install -g @wundam/orchex) - API key for your preferred LLM provider (Anthropic, OpenAI, Gemini, or Ollama)
- Understanding of async/await or async programming
- Familiarity with TypeScript or Python
MCP Protocol Basics
Architecture
┌─────────────────┐ ┌──────────────────┐
│ Your Client │ ◄─────► │ Orchex Server │
│ │ MCP │ │
│ - Tool listing │ │ - init │
│ - Tool calling │ │ - execute │
│ - Result handling│ │ - status / learn │
└─────────────────┘ └──────────────────┘Communication Flow
- Initialize: Client starts the Orchex MCP server process
- List Tools: Client requests available tools from server
- Call Tool: Client invokes a tool with parameters
- Stream Response: Server streams execution progress
- Handle Artifacts: Client processes generated files
Transport
MCP uses stdio transport - the server runs as a subprocess, and communication happens over stdin/stdout using JSON-RPC 2.0 messages.
Provider Configuration
Before starting the Orchex MCP server, configure your LLM provider via environment variables:
# Anthropic (Claude)
export ANTHROPIC_API_KEY="sk-ant-..."
# OpenAI (GPT-4, GPT-4.5)
export OPENAI_API_KEY="sk-..."
# Google Gemini
export GEMINI_API_KEY="AI..."
# Ollama (local models)
export OLLAMA_BASE_URL="http://localhost:11434"
# Optional: Force specific provider
export ORCHEX_PROVIDER="openai"Orchex auto-detects the provider based on which API key is set. Priority: Anthropic > OpenAI > Gemini > Ollama.
TypeScript/Node.js Implementation
Installation
npm install @modelcontextprotocol/sdkBasic Client
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { spawn } from "child_process";
interface OrchexClientOptions {
provider?: 'anthropic' | 'openai' | 'gemini' | 'ollama';
apiKey?: string;
ollamaBaseUrl?: string;
}
class OrchexMCPClient {
private client: Client;
private transport: StdioClientTransport;
private options: OrchexClientOptions;
constructor(options: OrchexClientOptions = {}) {
this.options = options;
}
async connect() {
// Build environment variables for the provider
const env: Record<string, string> = { ...process.env };
if (this.options.provider) {
env.ORCHEX_PROVIDER = this.options.provider;
}
if (this.options.apiKey) {
switch (this.options.provider) {
case 'anthropic':
env.ANTHROPIC_API_KEY = this.options.apiKey;
break;
case 'openai':
env.OPENAI_API_KEY = this.options.apiKey;
break;
case 'gemini':
env.GEMINI_API_KEY = this.options.apiKey;
break;
}
}
if (this.options.ollamaBaseUrl) {
env.OLLAMA_BASE_URL = this.options.ollamaBaseUrl;
}
// Create transport using server's stdin/stdout with provider config
this.transport = new StdioClientTransport({
command: "npx",
args: ["-y", "@wundam/orchex"],
env,
});
// Initialize MCP client
this.client = new Client(
{
name: "my-orchex-client",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
// Connect to server
await this.client.connect(this.transport);
console.log("Connected to Orchex MCP server");
}
async listTools() {
const response = await this.client.listTools();
return response.tools;
}
async executeOrchestration(mode: string = "auto") {
const result = await this.client.callTool({
name: "execute",
arguments: {
mode,
},
});
return result;
}
async close() {
await this.client.close();
}
}
// Usage with different providers
// OpenAI
const openaiClient = new OrchexMCPClient({
provider: 'openai',
apiKey: process.env.OPENAI_API_KEY,
});
await openaiClient.connect();
// Anthropic
const claudeClient = new OrchexMCPClient({
provider: 'anthropic',
apiKey: process.env.ANTHROPIC_API_KEY,
});
// Ollama (local)
const ollamaClient = new OrchexMCPClient({
provider: 'ollama',
ollamaBaseUrl: 'http://localhost:11434',
});
// Execute with chosen provider
const tools = await openaiClient.listTools();
console.log("Available tools:", tools);
const result = await openaiClient.executeOrchestration("auto");
console.log("Execution result:", result);
await openaiClient.close();Advanced: Streaming Progress
import { EventEmitter } from "events";
class StreamingOrchexClient extends OrchexMCPClient {
private events = new EventEmitter();
async executeWithProgress(manifestPath: string) {
// Execute orchestration
const result = await this.client.callTool({
name: "execute",
arguments: {
mode: "auto",
},
});
// Parse result for artifacts and status
if (result.content && Array.isArray(result.content)) {
for (const item of result.content) {
if (item.type === "text") {
// Parse orchestration artifact from JSON
try {
const artifact = JSON.parse(item.text);
this.events.emit("artifact", artifact);
} catch {
// Plain text response
this.events.emit("message", item.text);
}
}
}
}
return result;
}
on(event: string, handler: (...args: any[]) => void) {
this.events.on(event, handler);
}
}
// Usage with progress tracking
const client = new StreamingOrchexClient();
await client.connect();
client.on("artifact", (artifact) => {
console.log(`Wave ${artifact.currentWave}/${artifact.totalWaves}`);
console.log(`Stream: ${artifact.currentStream?.id}`);
console.log(`Status: ${artifact.currentStream?.status}`);
});
client.on("message", (text) => {
console.log("Server message:", text);
});
await client.executeWithProgress("./my-manifest.yaml");Error Handling
class RobustOrchexClient extends OrchexMCPClient {
async executeOrchestrationSafely(manifestPath: string) {
try {
const result = await this.executeOrchestration(manifestPath);
// Check if execution succeeded
if (result.isError) {
throw new Error(`Execution failed: ${result.content}`);
}
// Parse and validate artifact
const content = result.content[0];
if (content.type !== "text") {
throw new Error("Unexpected response format");
}
const artifact = JSON.parse(content.text);
// Check for failed streams
const failedStreams = artifact.streams.filter(
(s: any) => s.status === "failed"
);
if (failedStreams.length > 0) {
console.warn(`${failedStreams.length} streams failed`);
for (const stream of failedStreams) {
console.error(`- ${stream.id}: ${stream.error}`);
}
}
return artifact;
} catch (error) {
console.error("Execution error:", error);
throw error;
}
}
}Python Implementation
Installation
pip install mcpBasic Client
import asyncio
import json
import os
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import Optional, Literal
class OrchexMCPClient:
def __init__(
self,
provider: Optional[Literal['anthropic', 'openai', 'gemini', 'ollama']] = None,
api_key: Optional[str] = None,
ollama_base_url: Optional[str] = None
):
self.session = None
self.exit_stack = None
self.provider = provider
self.api_key = api_key
self.ollama_base_url = ollama_base_url
async def connect(self):
"""Connect to Orchex MCP server with provider configuration"""
# Build environment for provider
env = dict(os.environ)
if self.provider:
env['ORCHEX_PROVIDER'] = self.provider
if self.api_key:
if self.provider == 'anthropic':
env['ANTHROPIC_API_KEY'] = self.api_key
elif self.provider == 'openai':
env['OPENAI_API_KEY'] = self.api_key
elif self.provider == 'gemini':
env['GEMINI_API_KEY'] = self.api_key
if self.ollama_base_url:
env['OLLAMA_BASE_URL'] = self.ollama_base_url
server_params = StdioServerParameters(
command="npx",
args=["-y", "@wundam/orchex"],
env=env,
)
# Create stdio transport and session
stdio_transport = await stdio_client(server_params)
self.session = stdio_transport.session
# Initialize connection
await self.session.initialize()
print("Connected to Orchex MCP server")
async def list_tools(self):
"""List available tools"""
response = await self.session.list_tools()
return response.tools
async def execute_orchestration(self, mode: str = "auto"):
"""Execute the active orchestration"""
result = await self.session.call_tool(
"execute",
arguments={"mode": mode}
)
return result
async def close(self):
"""Close the connection"""
if self.session:
await self.session.close()
# Usage with different providers
async def main():
# OpenAI
openai_client = OrchexMCPClient(
provider='openai',
api_key=os.environ.get('OPENAI_API_KEY')
)
await openai_client.connect()
# Or Anthropic
# claude_client = OrchexMCPClient(
# provider='anthropic',
# api_key=os.environ.get('ANTHROPIC_API_KEY')
# )
# Or Ollama (local)
# ollama_client = OrchexMCPClient(
# provider='ollama',
# ollama_base_url='http://localhost:11434'
# )
# List available tools
tools = await openai_client.list_tools()
print("Available tools:")
for tool in tools:
print(f"- {tool.name}: {tool.description}")
# Execute orchestration
result = await openai_client.execute_orchestration("auto")
print("Execution result:", result)
await openai_client.close()
if __name__ == "__main__":
asyncio.run(main())With Progress Tracking
import json
from typing import Callable, Optional
class StreamingOrchexClient(OrchexMCPClient):
def __init__(self):
super().__init__()
self.on_artifact: Optional[Callable] = None
self.on_message: Optional[Callable] = None
async def execute_with_progress(self, mode: str = "auto"):
"""Execute orchestration with progress callbacks"""
result = await self.execute_orchestration(mode)
# Process result content
if result.content:
for item in result.content:
if item.type == "text":
try:
# Try to parse as artifact JSON
artifact = json.loads(item.text)
if self.on_artifact:
self.on_artifact(artifact)
except json.JSONDecodeError:
# Plain text message
if self.on_message:
self.on_message(item.text)
return result
# Usage
async def main():
client = StreamingOrchexClient()
await client.connect()
# Set up callbacks
def handle_artifact(artifact):
print(f"Wave {artifact['currentWave']}/{artifact['totalWaves']}")
if artifact.get('currentStream'):
stream = artifact['currentStream']
print(f"Stream: {stream['id']} - {stream['status']}")
def handle_message(text):
print(f"Message: {text}")
client.on_artifact = handle_artifact
client.on_message = handle_message
# Execute with progress
await client.execute_with_progress("auto")
await client.close()
asyncio.run(main())Available MCP Tools
The Orchex MCP server exposes these tools:
`init`
Initialize a new orchestration with a feature name and stream definitions.
Parameters:
feature(string, required): Feature name for the orchestrationstreams(object, required): Map of stream ID to stream definition (name, owns, reads, deps, plan, setup, verify)project_dir(string, optional): Project directory (defaults to cwd)
`execute`
Run the active orchestration — calls LLM API, applies artifacts, runs verification.
Parameters:
mode(string, optional):"auto"(all waves) or"wave"(one wave at a time). Default:"wave"dry_run(boolean, optional): Preview without executing. Default:falsemodel(string, optional): LLM model override
Returns: Orchestration progress including stream statuses, artifacts, and token usage.
`status`
Get the current orchestration status and progress.
`complete`
Mark a stream as complete, or archive the entire orchestration.
Parameters:
stream_id(string, optional): Stream ID to mark completearchive(boolean, optional): Archive the orchestration
`learn`
Parse a planning document and generate parallel stream definitions.
Parameters:
document_path(string, required): Path to markdown planning document
`init-plan`
Generate an annotated plan template optimized for orchex learn.
`recover`
Detect and recover streams stuck in in_progress or failed state.
`reload`
Restart the MCP server to pick up code or config changes.
Common Use Cases
1. CI/CD Pipeline Integration
// Run Orchex in GitHub Actions or GitLab CI
import { OrchexMCPClient } from "./orchex-client";
async function runInCI() {
const client = new OrchexMCPClient();
await client.connect();
try {
const result = await client.executeOrchestration(
process.env.MANIFEST_PATH || "./ci-manifest.yaml"
);
// Parse artifact
const artifact = JSON.parse(result.content[0].text);
// Check for failures
const failed = artifact.streams.filter(
(s: any) => s.status === "failed"
);
if (failed.length > 0) {
console.error("Execution failed");
process.exit(1);
}
console.log("✓ All streams completed successfully");
} finally {
await client.close();
}
}2. Interactive CLI Tool
import asyncio
import sys
from orchex_client import StreamingOrchexClient
async def interactive_execution():
client = StreamingOrchexClient()
await client.connect()
# Show progress bar
def show_progress(artifact):
current = artifact['currentWave']
total = artifact['totalWaves']
percent = (current / total) * 100
bar = '█' * int(percent / 5) + '░' * (20 - int(percent / 5))
print(f"\r[{bar}] {percent:.0f}% ({current}/{total} waves)", end='')
client.on_artifact = show_progress
manifest_path = sys.argv[1] if len(sys.argv) > 1 else "./manifest.yaml"
await client.execute_with_progress(manifest_path)
print("\n✓ Execution complete")
await client.close()
asyncio.run(interactive_execution())3. Web Service Integration
import express from "express";
import { OrchexMCPClient } from "./orchex-client";
const app = express();
app.use(express.json());
// Endpoint to execute manifests
app.post("/api/execute", async (req, res) => {
const { manifestPath } = req.body;
const client = new OrchexMCPClient();
try {
await client.connect();
const result = await client.executeOrchestration(manifestPath);
res.json({
success: true,
result: JSON.parse(result.content[0].text),
});
} catch (error) {
res.status(500).json({
success: false,
error: error.message,
});
} finally {
await client.close();
}
});
app.listen(3000, () => {
console.log("Orchex API server running on port 3000");
});Best Practices
Connection Management
// ✓ Good: Reuse client for multiple operations
const client = new OrchexMCPClient();
await client.connect();
try {
await client.executeOrchestration("./manifest1.yaml");
await client.executeOrchestration("./manifest2.yaml");
} finally {
await client.close();
}
// ✗ Bad: Create new client for each operation
for (const manifest of manifests) {
const client = new OrchexMCPClient();
await client.connect();
await client.executeOrchestration(manifest);
await client.close(); // Wasteful
}Error Handling
// Always handle both connection and execution errors
try {
await client.connect();
} catch (error) {
console.error("Failed to connect to Orchex server:", error);
// Check if orchex is installed
// Check if server is accessible
throw error;
}
try {
const result = await client.executeOrchestration(manifestPath);
} catch (error) {
console.error("Manifest execution failed:", error);
// Check manifest syntax
// Check file paths
throw error;
}Resource Cleanup
# Use context managers for automatic cleanup
from contextlib import asynccontextmanager
@asynccontextmanager
async def orchex_client():
client = OrchexMCPClient()
try:
await client.connect()
yield client
finally:
await client.close()
# Usage
async def main():
async with orchex_client() as client:
await client.execute_orchestration("./manifest.yaml")
# Client automatically closedDebugging
Enable Verbose Logging
// TypeScript: Log all MCP messages
const transport = new StdioClientTransport({
command: "npx",
args: ["-y", "@wundam/orchex"],
});
transport.onmessage = (message) => {
console.log("→", JSON.stringify(message, null, 2));
};
transport.onerror = (error) => {
console.error("✗", error);
};# Python: Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("mcp")
logger.setLevel(logging.DEBUG)Common Issues
Server not starting:
# Check if orchex is installed
npx @wundam/orchex --version
# Test server manually
npx -y @wundam/orchexConnection timeout:
// Increase timeout
const client = new Client(
{ name: "my-client", version: "1.0.0" },
{ capabilities: { tools: {} } },
{ timeout: 30000 } // 30 seconds
);Invalid responses:
// Validate response structure
if (!result.content || !Array.isArray(result.content)) {
throw new Error("Invalid response format");
}Next Steps
- MCP Server Reference - Detailed server API documentation
- REST API - Alternative HTTP-based API
- Node.js SDK - Higher-level programmatic access
- CI/CD Integration - Automate manifest execution
Resources
Community
Have questions or want to share your MCP client implementation?
- Support: orchex.dev/support
- Email: support@orchex.dev