Get your LangChain or LangGraph agents reporting into agentwach in under five minutes.
Sign up — a workspace and starter API key are provisioned automatically. Find your key in Settings → API Keys.
Send any agent event with your API key in the x-api-key header. The endpoint accepts two kinds: event (agent activity) and message (inter-agent communication).
POST /api/public/ingest
x-api-key: as_xxxxxxxxxxxxxxxx
Content-Type: application/json
{
"kind": "event",
"agent": { "external_id": "search_alpha", "name": "Search Agent", "type": "langgraph" },
"status": "running",
"current_task": "scraping SEC filings",
"event": { "type": "tool_call", "name": "web_search", "data": { "query": "NVDA Q4" } }
}Drop this handler into your existing LangChain pipeline. No re-architecting required.
import os, time, requests
from langchain.callbacks.base import BaseCallbackHandler
AGENTWACH_URL = "https://YOUR_PROJECT.lovable.app/api/public/ingest"
AGENTWACH_KEY = os.environ["AGENTWACH_API_KEY"]
class agentwachHandler(BaseCallbackHandler):
def __init__(self, agent_name: str, agent_type: str = "langchain"):
self.agent_name = agent_name
self.agent_type = agent_type
self.external_id = agent_name.lower().replace(" ", "_")
def _post(self, body):
requests.post(
AGENTWACH_URL,
headers={"x-api-key": AGENTWACH_KEY, "Content-Type": "application/json"},
json=body, timeout=2,
)
def on_chain_start(self, serialized, inputs, **kwargs):
self._post({
"kind": "event",
"agent": {"external_id": self.external_id, "name": self.agent_name, "type": self.agent_type},
"status": "running",
"current_task": str(inputs)[:120],
"event": {"type": "chain_start", "name": serialized.get("name"), "data": inputs},
})
def on_chain_end(self, outputs, **kwargs):
self._post({
"kind": "event",
"agent": {"external_id": self.external_id, "name": self.agent_name, "type": self.agent_type},
"status": "idle",
"event": {"type": "chain_end", "data": outputs},
})
def on_tool_start(self, serialized, input_str, **kwargs):
self._post({
"kind": "event",
"agent": {"external_id": self.external_id, "name": self.agent_name, "type": self.agent_type},
"status": "running",
"current_task": f"tool: {serialized.get('name')}",
"event": {"type": "tool_call", "name": serialized.get("name"), "data": {"input": input_str}},
})
# Send a message from one agent to another
def send_message(from_name, to_name, content):
requests.post(AGENTWACH_URL, headers={"x-api-key": AGENTWACH_KEY},
json={"kind": "message", "from": from_name, "to": to_name, "content": content})
Open the dashboard. Agents appear as soon as their first event arrives. Status, current task, and message stream update live.