Skip to content

drasi-project/langchain-drasi

Repository files navigation

LangChain-Drasi

LangChain-Drasi enables building reactive, event-driven AI agents by bridging external data changes with LangGraph workflows. Drasi continuous queries stream real-time updates that trigger agent state transitions, modify memory, or dynamically control workflow execution—transforming static agents into long-lived, responsive systems.

Overview

langchain-drasi provides a seamless way to connect LangChain/LangGraph agents to Drasi continuous queries, enabling AI agents to:

  • Discover available Drasi queries
  • Read current query results
  • Subscribe to real-time query updates
  • React to changes via notification handlers

Installation

pip install langchain-drasi

Development Installation

This project uses uv for fast dependency management.

# Clone the repository
git clone https://github.com/drasi-project/langchain-drasi.git
cd langchain-drasi

# Install with development dependencies
uv sync

# Or install without dev dependencies
make install

Quick Start

from langchain_drasi import create_drasi_tool, MCPConnectionConfig, ConsoleHandler

# Configure HTTP connection to remote Drasi MCP server
config = MCPConnectionConfig(
    server_url="http://localhost:8083",  # Default Drasi MCP server URL
    headers={"Authorization": "Bearer your-token"},  # Optional authentication
    timeout=30.0
)

# Create notification handler
handler = ConsoleHandler()

# Create the tool
tool = create_drasi_tool(
    mcp_config=config,
    notification_handlers=[handler]
)

# Use with LangChain agents (requires langchain <1.0)
from langchain import hub
from langchain.agents import AgentExecutor, create_react_agent
from langchain_openai import AzureChatOpenAI

llm = AzureChatOpenAI(
    azure_deployment="gpt-4o-mini",
    temperature=0
)

prompt = hub.pull("hwchase17/react-chat")
agent = create_react_agent(llm, [tool], prompt)
agent_executor = AgentExecutor(agent=agent, tools=[tool])

# Agent can now discover and read Drasi queries
result = await agent_executor.ainvoke({
    "input": "What queries are available?"
})

Features

🔍 Query Discovery

Agents can discover available Drasi queries automatically:

queries = await tool.discover_queries()
# Returns: [QueryInfo, QueryInfo, ...]

📖 Query Reading

Read current results from any Drasi query:

result = await tool.read_query("active-orders")
# Returns: QueryResult with current data

🔔 Real-time Subscriptions

Subscribe to query updates and handle changes:

await tool.subscribe("hot-freezers")
# Notifications routed to registered handlers

🎯 Built-in Handlers

Six ready-to-use notification handlers:

  • ConsoleHandler: Prints notifications to stdout with formatting
  • LoggingHandler: Logs notifications using Python logging
  • MemoryHandler: Stores notifications in memory for analysis
  • BufferHandler: Stores notifications in a FIFO queue for sequential consumption
  • LangChainMemoryHandler: Automatically injects notifications into LangChain conversation memory
  • LangGraphMemoryHandler: Automatically injects notifications into LangGraph checkpoints

🛠️ Custom Handlers

Implement your own notification handlers:

from langchain_drasi import BaseDrasiNotificationHandler

class MyHandler(BaseDrasiNotificationHandler):
    def on_result_added(self, query_name: str, added_data: dict) -> None:
        # Custom logic for new results
        self.save_to_database(query_name, added_data)

    def on_result_updated(self, query_name: str, updated_data: dict) -> None:
        # Custom logic for updates
        self.update_cache(query_name, updated_data)

    def on_result_deleted(self, query_name: str, deleted_data: dict) -> None:
        # Custom logic for deletions
        self.remove_from_cache(query_name, deleted_data)

Examples

See the examples/ directory for complete working examples:

Chat Examples (examples/chat/)

Interactive ReAct agents demonstrating automatic notification memory:

  • langchain_react.py: LangChain ReAct agent with LangChainMemoryHandler
  • langgraph_react.py: LangGraph ReAct agent with LangGraphMemoryHandler
  • Use case: Freezer temperature monitoring with real-time alerts

Invisible Hide and Seek Game (examples/hide_and_seek/)

Complex LangGraph agent demonstrating custom workflows and notification handling:

  • Custom LangGraph state machine that integrates Drasi tool
  • BufferHandler for processing real-time player positions
  • Use case: AI seeker agents find invisible hiders using Drasi continuous queries

Basic usage demonstrating core functionality

API Reference

Main Functions

create_drasi_tool()

Factory function to create a DrasiTool instance.

Parameters:

  • mcp_config (MCPConnectionConfig): MCP connection configuration
  • notification_handlers (list[DrasiNotificationHandler], optional): Notification handlers

Returns: DrasiTool instance

Configuration

MCPConnectionConfig

Pydantic model for HTTP-based MCP server connection configuration.

Fields:

  • server_url (str): HTTP/HTTPS URL of the Drasi MCP server
  • headers (dict[str, str], optional): HTTP headers for authentication
  • timeout (float): Request timeout in seconds (default: 30.0)
  • reconnect_policy (ReconnectPolicy): Reconnection settings

Handlers

LoggingHandler

Logs notifications using Python's logging framework.

handler = LoggingHandler(
    logger_name="drasi.notifications",
    log_level=logging.INFO
)

ConsoleHandler

Prints notifications to console with formatted output.

from langchain_drasi import ConsoleHandler

handler = ConsoleHandler()

# Use with create_drasi_tool
tool = create_drasi_tool(
    mcp_config=config,
    notification_handlers=[handler]
)

MemoryHandler

Stores notifications in memory.

from langchain_drasi import MemoryHandler

handler = MemoryHandler(max_size=100)

# Retrieve notifications
all_notifs = handler.get_all()
freezer_notifs = handler.get_by_query("freezerx")
added_events = handler.get_by_type("added")

BufferHandler

Stores notifications in a FIFO queue for sequential consumption. This is useful for buffering incoming change notifications, while your workflow may be busy on another step.

from langchain_drasi import BufferHandler

handler = BufferHandler(max_size=50)

# Use with create_drasi_tool
tool = create_drasi_tool(
    mcp_config=config,
    notification_handlers=[handler]
)

# Consume notifications one at a time
while not handler.is_empty():
    notification = handler.consume()
    process_notification(notification)

# Or peek without consuming
next_notif = handler.peek()

# Check buffer status
current_size = handler.size()

LangChainMemoryHandler

Automatically injects notifications into LangChain conversation memory as system messages.

from langchain.memory import ConversationBufferMemory
from langchain_drasi import LangChainMemoryHandler

memory = ConversationBufferMemory(
    memory_key="chat_history",
    input_key="input",
    output_key="output",
)

handler = LangChainMemoryHandler(memory)

# Notifications are automatically added to conversation memory
tool = create_drasi_tool(
    mcp_config=config,
    notification_handlers=[handler]
)

See examples/chat/langchain_react.py for a complete example.

LangGraphMemoryHandler

Automatically injects notifications into LangGraph checkpoints as system messages.

from langgraph.checkpoint.memory import MemorySaver
from langchain_drasi import LangGraphMemoryHandler

memory = MemorySaver()
thread_id = "my-conversation"

handler = LangGraphMemoryHandler(memory, thread_id)

# Create agent with wrapped checkpointer
from langgraph.prebuilt import create_react_agent

agent = create_react_agent(
    model=llm,
    tools=[drasi_tool],
    checkpointer=handler.checkpointer,  # Use wrapped checkpointer
)

See examples/chat/langgraph_react.py for a complete example.

Development

Running Tests

# Run all tests (including integration)
make test

# Run tests excluding integration tests
make test-fast

# Run unit tests only
make test-unit

# Run integration tests only
make test-integration

Code Quality

# Format code
make format

# Run linting checks (ruff + mypy + pyright)
make lint

# Run type checking only
make typecheck

Available Make Targets

Run make help to see all available commands.

Requirements

  • Python 3.11+
  • LangChain Core >=0.1.0
  • LangGraph >=0.1.0
  • MCP SDK >=1.0.0
  • Pydantic >=2.0.0

Note: Examples using LangChain's legacy APIs (agents, memory, hub) require LangChain <1.0. For LangChain 1.0+, use LangGraph-based workflows.

Use Case Examples

1. Realtime Knowledge Agents

Example: AI Trading or News Monitoring Agent

Build agents that maintain evolving understanding of companies or topics. When new market data, filings, or news arrives, Drasi continuous queries detect changes and push updates into LangGraph memory via notification handlers.

The workflow can:

  • Trigger summarization nodes
  • Re-evaluate trading strategy nodes
  • Send alerts when thresholds are crossed

Key benefits: Async events changing agent reasoning mid-execution

2. Collaborative AI Co-Pilots

Example: Project Management Assistant (Jira + Slack Integration)

Create agents that manage "plan and execute" loops. LangChain-Drasi streams updates when:

  • New tickets are created
  • Teammates comment
  • Deployment pipelines fail

The agent dynamically adjusts workflows to:

  • Reassign tasks
  • Summarize recent changes
  • Notify relevant stakeholders

Key benefits: Integration with human workflows and reactive decision making

3. IoT or Environment-Aware Agents

Example: Smart Home / Facility Monitoring Agent

Implement agents following "Observe → Diagnose → Act" patterns. LangChain-Drasi streams sensor data (temperature, occupancy, motion) from Drasi queries, enabling agents to receive events like:

  • "Temperature > 90°F in server room"
  • "Door left open after 10PM"

These trigger subgraph actions such as notifying staff or adjusting systems.

Key benefits: Event-driven control loops with context persistence

4. Customer Support or CRM AI

Example: Proactive Customer Agent

Build agents that track ongoing support tickets and dynamically respond to external updates (customer replies, sentiment scores, transaction data) streamed via Drasi.

The agent can:

  • Update its mental model of customers
  • Suggest next actions
  • Flag escalation workflows

Key benefits: Dynamic memory updates and priority re-ranking based on streaming data

5. Game or Simulation AI

Example: Dynamic NPC or Dungeon Master Agent

Create agents where LangGraph models narrative or game logic, while LangChain-Drasi feeds real-time game state:

  • Player positions
  • Inventory changes
  • Player chat inputs

The AI responds with adaptive storylines or strategic NPC behaviors.

Key benefits: Continuous interaction loops and event-driven storytelling

7. DevOps or Observability Assistant

Example: LLM-Augmented Ops Monitor

Build agents that follow response patterns (detect → diagnose → remediate). Drasi queries monitor logs, metrics, or alerts, and LangChain-Drasi routes these updates into the agent's context, triggering:

  • Log analysis
  • Hypothesis generation
  • Action nodes (restart service, notify engineer)

Key benefits: Event-triggered reasoning pipelines integrating with infrastructure telemetry

8. Realtime Collaborative Editing / Chat Agents

Example: Async Group Assistant

Develop agents for multi-user scenarios where users edit or discuss in real-time. LangChain-Drasi receives streaming edits, comments, or conversation events, enabling agents to:

  • Maintain global context
  • Offer live suggestions
  • Adjust strategies collaboratively

Key benefits: Multi-user event synchronization and async context adaptation

License

Apache License 2.0 - see LICENSE file for details.

Support

Acknowledgments

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published