Building Peer Agents
A PeerAgent is a full-duplex agent that can both host inbound requests (act as a server) AND call other agents (act as a client). This enables sophisticated multi-agent architectures like orchestrators, routers, and aggregators.
Overview
PeerAgent is the most powerful building block in the @a2aletheia/a2a SDK. It combines:
- Inbound capabilities — Respond to requests from other agents via A2A protocol
- Outbound capabilities — Call other agents with trust verification
Common use cases:
- Orchestrator agents — Route tasks to specialist agents based on content
- Aggregator agents — Combine results from multiple agents
- Gateway agents — Translate between protocols or add middleware
- Coordinator agents — Manage workflows across multiple agents
Architecture
PeerAgent is a composition of two core components:
┌─────────────────────────────────────────────┐
│ PeerAgent │
│ │
│ ┌───────────────┐ ┌───────────────────┐ │
│ │ AletheiaAgent │ │ AletheiaA2A │ │
│ │ (server) │ │ (client) │ │
│ │ │ │ │ │
│ │ • handle() │ │ • discover() │ │
│ │ • onCancel() │ │ • connect() │ │
│ │ • handleReq() │ │ • sendByCap() │ │
│ │ • start() │ │ • streamByCap() │ │
│ └───────────────┘ └───────────────────┘ │
│ ▲ ▲ │
│ │ │ │
│ Inbound traffic Outbound traffic │
└─────────────────────────────────────────────┘
- AletheiaAgent — Handles incoming A2A protocol requests, manages task lifecycle
- AletheiaA2A — Discovers and calls other agents with trust verification
Creating a PeerAgent
PeerAgentConfig Interface
import { PeerAgent, type PeerAgentConfig } from "@a2aletheia/a2a";
const config: PeerAgentConfig = {
// Registry URL (shared by server and client)
registryUrl: "https://registry.aletheia.dev",
// Server-side configuration (AletheiaAgent)
name: "Orchestrator",
version: "1.0.0",
url: "https://orchestrator.example.com",
description: "Routes tasks to specialist agents",
skills: [
{
id: "orchestrate",
name: "orchestrate-tasks",
description: "Route tasks to appropriate specialist agents",
tags: ["orchestration", "routing"],
},
],
// Optional server config
defaultInputModes: ["text/plain", "application/json"],
defaultOutputModes: ["text/plain", "application/json"],
capabilities: {
streaming: true,
pushNotifications: false,
},
iconUrl: "https://orchestrator.example.com/icon.png",
documentationUrl: "https://docs.example.com/orchestrator",
provider: {
organization: "Example Corp",
url: "https://example.com",
},
aletheiaExtensions: {
did: "did:web:orchestrator.example.com",
owner: "0x1234...",
livenessPingUrl: "https://orchestrator.example.com/health",
},
taskStore: customTaskStore, // Optional: Redis, etc.
// Client-side configuration (AletheiaA2A)
agentSelector: customSelector, // Optional: custom selection logic
minTrustScore: 0.7, // Minimum trust score for outbound calls
requireLive: true, // Only call agents passing liveness checks
livenessCheckBeforeSend: true, // Verify liveness before each call
verifyIdentity: true, // Verify DIDs before connecting
authToken: "your-registry-token", // Optional: for authenticated discovery
// Observability (BYOL - Bring Your Own Logger)
logger: customLogger,
logLevel: "debug",
};
Minimal Example
import { PeerAgent } from "@a2aletheia/a2a";
const peer = new PeerAgent({
name: "Router",
version: "1.0.0",
url: "https://router.example.com",
description: "Routes messages to handlers",
skills: [{ id: "route", name: "route", description: "Route messages", tags: [] }],
minTrustScore: 0.5,
});
Inbound: Handling Requests
Use handle() to register a message handler that processes incoming requests.
AgentHandler Signature
type AgentHandler = (
context: AgentContext,
response: AgentResponse,
) => Promise<void>;
The AgentContext provides access to the incoming message:
interface AgentContext {
readonly taskId: string;
readonly contextId: string;
readonly textContent: string; // All text parts joined
readonly dataContent: Record<string, unknown> | null; // First data part
readonly parts: Part[]; // Raw message parts
readonly message: Message; // Full A2A message
}
ResponseBuilder API
The AgentResponse provides methods for building responses:
Quick Responses (Auto-complete)
peer.handle(async (context, response) => {
// Simple text response
response.text("Hello from the agent!");
// JSON data response
response.data({ result: "success", count: 42 });
// Custom parts
response.message([
{ kind: "text", text: "Here's the result:" },
{ kind: "data", data: { value: 123 } },
]);
});
Streaming Responses
For long-running tasks, use streaming:
peer.handle(async (context, response) => {
// Signal we're working
response.working("Processing your request...");
// Stream artifacts as they're generated
for (const chunk of processLargeData(context.textContent)) {
response.artifact(
{
name: "output.txt",
mimeType: "text/plain",
parts: [{ kind: "text", text: chunk }],
},
{ append: true, lastChunk: false },
);
}
// Mark final artifact chunk
response.artifact(finalArtifact, { lastChunk: true });
// Complete the task
response.done("Processing complete!");
});
Status States
peer.handle(async (context, response) => {
// Working state (non-final)
response.working("Analyzing...");
// Completed (final)
response.done("Analysis complete");
// Failed (final)
response.fail("Something went wrong");
// Canceled (final)
response.canceled();
// Input required (non-final) - pause for user input
response.inputRequired("Please provide additional information");
});
Using onCancel() for Cancellation
Handle task cancellation requests:
peer.handle(async (context, response) => {
const abortController = new AbortController();
// Store controller for cancellation
activeTasks.set(context.taskId, abortController);
try {
const result = await longRunningProcess(
context.textContent,
abortController.signal,
);
response.text(result);
} finally {
activeTasks.delete(context.taskId);
}
});
peer.onCancel(async (taskId, response) => {
const controller = activeTasks.get(taskId);
if (controller) {
controller.abort();
activeTasks.delete(taskId);
}
response.canceled();
});
Outbound: Calling Other Agents
Use client methods within your handler to call other agents.
discover()
Find agents matching criteria:
peer.handle(async (context, response) => {
// Discover by capability
const translators = await peer.discover({
capability: "translate",
minTrustScore: 0.8,
isLive: true,
limit: 5,
});
// Discover by query
const agents = await peer.discover({
query: "code analysis",
limit: 10,
});
response.text(`Found ${translators.length} translators`);
});
connect()
Establish a persistent connection to an agent:
peer.handle(async (context, response) => {
// Connect by DID (with trust verification)
const trustedAgent = await peer.connect(
"did:web:translator.example.com",
);
// Send a message
const result = await trustedAgent.send("Translate this text");
response.text(result.response.parts[0].text);
});
connectByUrl()
Connect to a specific registered agent by URL when you already know its registry URL:
peer.handle(async (context, response) => {
const agent = await peer.connectByUrl(
"https://translator.example.com",
);
const result = await agent.send(context.textContent);
response.text(result.response.parts[0].text);
});
sendByCapability()
One-shot discovery, selection, and send:
peer.handle(async (context, response) => {
// Automatically discovers, selects best agent, and sends
const result = await peer.sendByCapability(
"translate", // Capability to find
context.textContent, // Message (string or MessageInput)
{ timeoutMs: 30000 }, // Options
);
console.log("Trust info:", result.trustInfo);
console.log("Agent:", result.agentName);
response.text(result.response.parts[0].text);
});
streamByCapability()
Stream responses from long-running tasks:
peer.handle(async (context, response) => {
response.working("Streaming from specialist agent...");
for await (const event of peer.streamByCapability(
"analyze",
{ text: context.textContent },
)) {
switch (event.kind) {
case "status-update":
if (event.event.status.state === "working") {
response.working("Agent is working...");
}
break;
case "artifact-update":
response.artifact(event.event.artifact);
break;
}
}
response.done("Streaming complete");
});
Lifecycle Events
Subscribe to lifecycle events for observability:
import type { AletheiaEventType } from "@a2aletheia/sdk";
peer.on("agent.start", (event) => {
console.log("Agent started:", event.data);
});
peer.on("message.received", (event) => {
metrics.increment("messages.received");
});
peer.on("trust.verified", (event) => {
console.log("Trust verified for:", event.data?.agentDid);
});
peer.on("trust.failed", (event) => {
console.error("Trust verification failed:", event.data);
});
// Wildcard - receive all events
peer.on("*", (event) => {
analytics.track(`aletheia.${event.type}`, event.data);
});
Available Event Types
| Event | Description |
|---|---|
agent.start |
Agent server started |
agent.stop |
Agent server stopped |
message.received |
Incoming message received |
message.sent |
Outbound message sent |
message.failed |
Message delivery failed |
trust.verified |
Trust verification succeeded |
trust.failed |
Trust verification failed |
rating.submitted |
Rating submitted to registry |
rating.received |
Rating received from registry |
discovery.search |
Agent discovery initiated |
discovery.connect |
Connected to an agent |
liveness.check |
Liveness check initiated |
liveness.result |
Liveness check result |
Starting the Server
Standalone Express Server
await peer.start(4000);
console.log("PeerAgent listening on port 4000");
Graceful Shutdown
process.on("SIGTERM", () => {
peer.stop();
process.exit(0);
});
Framework Integration
Use handleRequest() for custom HTTP frameworks:
Hono Example
import { Hono } from "hono";
import { PeerAgent } from "@a2aletheia/a2a";
const app = new Hono();
const peer = new PeerAgent({ /* config */ });
peer.handle(async (context, response) => {
response.text("Hello from Hono!");
});
// Handle A2A requests
app.all("/", async (c) => {
const body = await c.req.json();
const result = await peer.handleRequest(body);
// Streaming response
if (Symbol.asyncIterator in Object(result)) {
return new Response(
new ReadableStream({
async start(controller) {
for await (const chunk of result as AsyncGenerator) {
controller.enqueue(
new TextEncoder().encode(JSON.stringify(chunk) + "\n"),
);
}
controller.close();
},
}),
{
headers: {
"Content-Type": "application/jsonl",
},
},
);
}
// Regular response
return c.json(result);
});
// Serve agent card
app.get("/.well-known/agent-card.json", (c) => {
return c.json(peer.getAgentCard());
});
export default app;
Fastify Example
import Fastify from "fastify";
import { PeerAgent } from "@a2aletheia/a2a";
const fastify = Fastify();
const peer = new PeerAgent({ /* config */ });
peer.handle(async (context, response) => {
const result = await peer.sendByCapability("translate", context.textContent);
response.text(result.response.parts[0].text);
});
fastify.post("/", async (request, reply) => {
const result = await peer.handleRequest(request.body);
if (Symbol.asyncIterator in Object(result)) {
reply.type("application/jsonl");
return reply.send(
async function* () {
for await (const chunk of result as AsyncGenerator) {
yield JSON.stringify(chunk) + "\n";
}
}(),
);
}
return result;
});
fastify.get("/.well-known/agent-card.json", async () => {
return peer.getAgentCard();
});
await fastify.listen({ port: 4000 });
Complete Example
Full orchestrator agent that routes to specialist agents:
import {
PeerAgent,
type PeerAgentConfig,
AgentNotFoundError,
} from "@a2aletheia/a2a";
const config: PeerAgentConfig = {
registryUrl: "https://registry.aletheia.dev",
name: "Orchestrator",
version: "1.0.0",
url: "https://orchestrator.example.com",
description: "Routes tasks to specialist agents based on content",
skills: [
{
id: "orchestrate",
name: "orchestrate",
description: "Route tasks to specialist agents",
tags: ["orchestration", "routing", "multi-agent"],
},
],
capabilities: { streaming: true },
aletheiaExtensions: {
did: "did:web:orchestrator.example.com",
livenessPingUrl: "https://orchestrator.example.com/health",
},
minTrustScore: 0.7,
requireLive: true,
livenessCheckBeforeSend: true,
verifyIdentity: true,
logLevel: "info",
};
const peer = new PeerAgent(config);
// Map keywords to capabilities
const ROUTING_RULES: Record<string, string> = {
translate: "translate",
translation: "translate",
analyze: "analyze",
analysis: "analyze",
code: "code-review",
review: "code-review",
summarize: "summarize",
summary: "summarize",
};
function detectCapability(text: string): string | null {
const lower = text.toLowerCase();
for (const [keyword, capability] of Object.entries(ROUTING_RULES)) {
if (lower.includes(keyword)) {
return capability;
}
}
return null;
}
peer.handle(async (context, response) => {
const text = context.textContent;
const capability = detectCapability(text);
if (!capability) {
response.text(
"I can help with translation, analysis, code review, and summarization. " +
"Please specify what you'd like to do.",
);
return;
}
response.working(`Routing to ${capability} specialist...`);
try {
// Stream response from specialist
for await (const event of peer.streamByCapability(capability, text)) {
switch (event.kind) {
case "status-update":
const status = event.event.status.state;
if (status === "working") {
response.working(`${capability} agent is processing...`);
}
break;
case "artifact-update":
response.artifact(event.event.artifact, {
append: event.event.append,
lastChunk: event.event.lastChunk,
});
break;
case "message":
// Forward the message
response.message(event.event.parts);
return;
}
}
response.done(`${capability} task completed`);
} catch (error) {
if (error instanceof AgentNotFoundError) {
response.fail(`No ${capability} agents available. Please try again later.`);
} else {
response.fail(`Error: ${error instanceof Error ? error.message : "Unknown error"}`);
}
}
});
// Optional: Handle cancellation
peer.onCancel(async (taskId, response) => {
console.log(`Task ${taskId} cancelled`);
response.canceled();
});
// Observability
peer.on("message.received", (event) => {
console.log(`Received: ${event.data?.taskId}`);
});
peer.on("trust.failed", (event) => {
console.error(`Trust failed: ${event.data?.reason}`);
});
// Start the server
await peer.start(4000);
console.log("Orchestrator agent running on port 4000");
Escape Hatches
For advanced use cases, access the underlying components:
getAgent()
Get the underlying AletheiaAgent for server-side operations:
const agent = peer.getAgent();
// Access the Express request handler
const handler = agent.getRequestHandler();
// Get the task store
const store = agent.getTaskStore();
// Get agent card
const card = agent.getAgentCard();
getClient()
Get the underlying AletheiaA2A client for advanced client operations:
const client = peer.getClient();
// Clear connection caches
client.clearConnections();
// Disconnect a specific agent
client.disconnectAgent("did:web:agent.example.com");
// Access the logger
client.logger.debug("Custom debug message");
Best Practices
Error Handling in Handlers
Always handle errors gracefully:
peer.handle(async (context, response) => {
try {
const result = await peer.sendByCapability("analyze", context.textContent);
response.text(result.response.parts[0].text);
} catch (error) {
if (error instanceof AgentNotFoundError) {
response.fail("No suitable agents found. Try again later.");
} else if (error instanceof A2AProtocolError) {
response.fail(`Protocol error: ${error.message}`);
} else {
response.fail("An unexpected error occurred");
// Log full error for debugging
peer.getClient().logger.error("Handler error", error);
}
}
});
Context Management
Reuse context IDs for multi-turn conversations:
peer.handle(async (context, response) => {
// First call establishes context
const result1 = await peer.sendByCapability(
"translate",
{ text: context.textContent, contextId: context.contextId },
);
// Follow-up uses same context
const result2 = await peer.sendByCapability(
"translate",
{
text: "Now translate to Spanish",
contextId: result1.response.contextId,
taskId: result1.response.id,
},
);
response.text(result2.response.parts[0].text);
});
Timeout Configuration
Set appropriate timeouts for outbound calls:
const result = await peer.sendByCapability("analyze", input, {
timeoutMs: 60000, // 60 seconds for long analysis
blocking: true, // Wait for completion
});
Resource Cleanup
Always clean up resources on shutdown:
const activeConnections = new Map<string, AbortController>();
peer.onCancel(async (taskId, response) => {
activeConnections.get(taskId)?.abort();
activeConnections.delete(taskId);
response.canceled();
});
process.on("SIGTERM", () => {
// Abort all active tasks
for (const controller of activeConnections.values()) {
controller.abort();
}
peer.stop();
process.exit(0);
});
Streaming Best Practices
When streaming to downstream agents:
peer.handle(async (context, response) => {
response.working("Starting pipeline...");
try {
for await (const event of peer.streamByCapability("analyze", context.textContent)) {
// Don't block the stream with slow operations
if (event.kind === "artifact-update") {
response.artifact(event.event.artifact);
}
}
response.done();
} catch (error) {
// Use fail() to properly terminate the stream
response.fail(error instanceof Error ? error.message : "Stream failed");
}
});