Platform Extensions
β οΈ ROADMAP FEATURE - NOT YET AVAILABLE
This document describes AgenticFlow's planned Platform Extensions framework for developers. This comprehensive extension system is currently under development and not yet available.
Current Status: Design & Planning Phase Availability: To be announced For Updates: Contact [email protected]
π§ Planned Extension Framework
The planned AgenticFlow Platform Extensions will enable developers to build powerful extensions that deeply integrate with the core platform, from custom MCP servers to workflow engines and enterprise integrations.
π Platform Extensions Overview
What Are Platform Extensions:
ποΈ System-Level Components - Extend core platform functionality
π Deep Integration - Hook into AgenticFlow's internal systems
π― Enterprise Features - Build enterprise-grade extensions
π Ecosystem Contribution - Contribute to the broader AgenticFlow platform
π§ Custom Protocols - Implement custom communication protocols
Extension Types:
from agenticflow.extensions import PlatformExtension
class MyPlatformExtension(PlatformExtension):
def __init__(self):
super().__init__(
name="custom_enterprise_auth",
category="authentication",
version="1.0.0"
)
async def initialize(self, platform_context):
"""Initialize extension with platform context"""
await self.setup_enterprise_auth(platform_context)
async def handle_request(self, request, context):
"""Handle incoming requests"""
return await self.process_auth_request(request, context)
ποΈ Extension Architecture
Core Extension Framework
Base Extension Class:
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from agenticflow.platform import PlatformContext
from agenticflow.events import EventBus
from agenticflow.config import ExtensionConfig
class PlatformExtension(ABC):
"""Base class for all platform extensions"""
def __init__(
self,
name: str,
category: str,
version: str = "1.0.0",
description: str = "",
author: str = "",
dependencies: List[str] = None
):
self.name = name
self.category = category
self.version = version
self.description = description
self.author = author
self.dependencies = dependencies or []
# Platform integration
self.platform_context: Optional[PlatformContext] = None
self.event_bus: Optional[EventBus] = None
self.config: Optional[ExtensionConfig] = None
# Extension state
self.initialized = False
self.enabled = True
self.metrics = {}
@abstractmethod
async def initialize(self, platform_context: PlatformContext) -> bool:
"""Initialize the extension with platform context"""
pass
@abstractmethod
async def shutdown(self) -> bool:
"""Shutdown the extension gracefully"""
pass
async def configure(self, config: Dict[str, Any]) -> bool:
"""Configure the extension with settings"""
self.config = ExtensionConfig(config)
return True
async def health_check(self) -> Dict[str, Any]:
"""Perform health check and return status"""
return {
"name": self.name,
"version": self.version,
"status": "healthy" if self.enabled else "disabled",
"initialized": self.initialized,
"metrics": self.metrics
}
def get_manifest(self) -> Dict[str, Any]:
"""Get extension manifest information"""
return {
"name": self.name,
"category": self.category,
"version": self.version,
"description": self.description,
"author": self.author,
"dependencies": self.dependencies
}
# Event system integration
async def emit_event(self, event_type: str, data: Dict[str, Any]):
"""Emit event to platform event bus"""
if self.event_bus:
await self.event_bus.emit(f"extension.{self.name}.{event_type}", data)
async def subscribe_to_event(self, event_type: str, handler):
"""Subscribe to platform events"""
if self.event_bus:
await self.event_bus.subscribe(event_type, handler)
class ExtensionManager:
"""Manage platform extensions"""
def __init__(self, platform_context: PlatformContext):
self.platform_context = platform_context
self.extensions: Dict[str, PlatformExtension] = {}
self.event_bus = EventBus()
async def load_extension(
self,
extension_class: type,
config: Dict[str, Any] = None
) -> bool:
"""Load and initialize an extension"""
try:
# Create extension instance
extension = extension_class()
# Configure extension
if config:
await extension.configure(config)
# Set platform context and event bus
extension.platform_context = self.platform_context
extension.event_bus = self.event_bus
# Initialize extension
success = await extension.initialize(self.platform_context)
if success:
extension.initialized = True
self.extensions[extension.name] = extension
await self.event_bus.emit("extension.loaded", {
"name": extension.name,
"category": extension.category,
"version": extension.version
})
return True
except Exception as e:
print(f"Failed to load extension {extension_class.__name__}: {e}")
return False
async def unload_extension(self, name: str) -> bool:
"""Unload an extension"""
if name in self.extensions:
extension = self.extensions[name]
try:
await extension.shutdown()
extension.enabled = False
extension.initialized = False
del self.extensions[name]
await self.event_bus.emit("extension.unloaded", {
"name": name
})
return True
except Exception as e:
print(f"Failed to unload extension {name}: {e}")
return False
async def get_extension_health(self) -> Dict[str, Any]:
"""Get health status of all extensions"""
health_status = {}
for name, extension in self.extensions.items():
try:
health_status[name] = await extension.health_check()
except Exception as e:
health_status[name] = {
"name": name,
"status": "error",
"error": str(e)
}
return health_status
Extension Categories
Authentication Extensions:
from agenticflow.extensions.auth import AuthenticationExtension
from agenticflow.auth import AuthProvider
import jwt
import ldap3
class EnterpriseAuthExtension(AuthenticationExtension):
"""Enterprise authentication with LDAP, SAML, and custom providers"""
def __init__(self):
super().__init__(
name="enterprise_auth",
description="Enterprise authentication with multiple providers",
version="2.0.0"
)
self.auth_providers = {}
self.session_store = {}
async def initialize(self, platform_context: PlatformContext) -> bool:
"""Initialize authentication extension"""
try:
# Initialize LDAP provider
await self._setup_ldap_provider()
# Initialize SAML provider
await self._setup_saml_provider()
# Initialize custom JWT provider
await self._setup_jwt_provider()
# Setup session management
await self._setup_session_management()
return True
except Exception as e:
print(f"Failed to initialize enterprise auth: {e}")
return False
async def authenticate(self, credentials: Dict[str, Any]) -> Dict[str, Any]:
"""Authenticate user with multiple provider support"""
auth_method = credentials.get("method", "ldap")
if auth_method not in self.auth_providers:
return {
"success": False,
"error": f"Authentication method {auth_method} not supported"
}
provider = self.auth_providers[auth_method]
try:
auth_result = await provider.authenticate(credentials)
if auth_result["success"]:
# Create session
session_token = await self._create_session(auth_result["user"])
return {
"success": True,
"user": auth_result["user"],
"token": session_token,
"expires_in": 3600 # 1 hour
}
else:
return auth_result
except Exception as e:
return {
"success": False,
"error": f"Authentication failed: {e}"
}
async def _setup_ldap_provider(self):
"""Setup LDAP authentication provider"""
class LDAPProvider(AuthProvider):
def __init__(self, config):
self.server_url = config.get("server_url")
self.bind_dn = config.get("bind_dn")
self.bind_password = config.get("bind_password")
self.user_base_dn = config.get("user_base_dn")
self.user_filter = config.get("user_filter", "(uid={username})")
async def authenticate(self, credentials):
username = credentials.get("username")
password = credentials.get("password")
try:
server = ldap3.Server(self.server_url, get_info=ldap3.ALL)
conn = ldap3.Connection(
server,
self.bind_dn,
self.bind_password,
auto_bind=True
)
# Search for user
search_filter = self.user_filter.format(username=username)
conn.search(self.user_base_dn, search_filter, attributes=['*'])
if not conn.entries:
return {"success": False, "error": "User not found"}
user_entry = conn.entries[0]
user_dn = user_entry.entry_dn
# Authenticate user
user_conn = ldap3.Connection(server, user_dn, password)
if not user_conn.bind():
return {"success": False, "error": "Invalid credentials"}
# Extract user information
user_info = {
"username": username,
"dn": user_dn,
"attributes": dict(user_entry)
}
return {"success": True, "user": user_info}
except Exception as e:
return {"success": False, "error": str(e)}
ldap_config = self.config.get("ldap", {})
self.auth_providers["ldap"] = LDAPProvider(ldap_config)
async def _setup_saml_provider(self):
"""Setup SAML authentication provider"""
class SAMLProvider(AuthProvider):
def __init__(self, config):
self.idp_url = config.get("idp_url")
self.sp_entity_id = config.get("sp_entity_id")
self.certificate = config.get("certificate")
async def authenticate(self, credentials):
# SAML authentication logic
saml_response = credentials.get("saml_response")
# Validate SAML response
# Extract user information
# Return authentication result
return {"success": True, "user": {"username": "saml_user"}}
saml_config = self.config.get("saml", {})
self.auth_providers["saml"] = SAMLProvider(saml_config)
async def _setup_jwt_provider(self):
"""Setup JWT authentication provider"""
class JWTProvider(AuthProvider):
def __init__(self, config):
self.secret_key = config.get("secret_key")
self.algorithm = config.get("algorithm", "HS256")
self.issuer = config.get("issuer")
async def authenticate(self, credentials):
token = credentials.get("token")
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
issuer=self.issuer
)
return {
"success": True,
"user": {
"username": payload.get("sub"),
"email": payload.get("email"),
"roles": payload.get("roles", [])
}
}
except jwt.ExpiredSignatureError:
return {"success": False, "error": "Token expired"}
except jwt.InvalidTokenError:
return {"success": False, "error": "Invalid token"}
jwt_config = self.config.get("jwt", {})
self.auth_providers["jwt"] = JWTProvider(jwt_config)
async def _create_session(self, user: Dict[str, Any]) -> str:
"""Create user session"""
import uuid
from datetime import datetime, timedelta
session_id = str(uuid.uuid4())
expires_at = datetime.utcnow() + timedelta(hours=1)
self.session_store[session_id] = {
"user": user,
"created_at": datetime.utcnow(),
"expires_at": expires_at,
"last_activity": datetime.utcnow()
}
return session_id
Storage Extensions:
from agenticflow.extensions.storage import StorageExtension
from agenticflow.storage import StorageProvider
import boto3
import aioredis
from pymongo import MongoClient
class EnterpriseStorageExtension(StorageExtension):
"""Enterprise storage with multiple backend support"""
def __init__(self):
super().__init__(
name="enterprise_storage",
description="Multi-backend enterprise storage solution",
version="1.5.0"
)
self.storage_backends = {}
async def initialize(self, platform_context: PlatformContext) -> bool:
"""Initialize storage backends"""
try:
# Setup S3 storage
await self._setup_s3_storage()
# Setup Redis cache
await self._setup_redis_cache()
# Setup MongoDB document store
await self._setup_mongodb_storage()
# Setup local file system
await self._setup_filesystem_storage()
return True
except Exception as e:
print(f"Failed to initialize storage extension: {e}")
return False
async def store_data(
self,
key: str,
data: Any,
storage_type: str = "default",
metadata: Dict[str, Any] = None
) -> bool:
"""Store data in specified backend"""
if storage_type not in self.storage_backends:
raise ValueError(f"Storage backend {storage_type} not available")
backend = self.storage_backends[storage_type]
try:
success = await backend.store(key, data, metadata)
if success:
await self.emit_event("data_stored", {
"key": key,
"storage_type": storage_type,
"size": len(str(data)) if isinstance(data, (str, bytes)) else None
})
return success
except Exception as e:
await self.emit_event("storage_error", {
"key": key,
"storage_type": storage_type,
"error": str(e)
})
raise
async def retrieve_data(self, key: str, storage_type: str = "default") -> Any:
"""Retrieve data from specified backend"""
if storage_type not in self.storage_backends:
raise ValueError(f"Storage backend {storage_type} not available")
backend = self.storage_backends[storage_type]
try:
data = await backend.retrieve(key)
await self.emit_event("data_retrieved", {
"key": key,
"storage_type": storage_type
})
return data
except Exception as e:
await self.emit_event("retrieval_error", {
"key": key,
"storage_type": storage_type,
"error": str(e)
})
raise
async def _setup_s3_storage(self):
"""Setup AWS S3 storage backend"""
class S3StorageProvider(StorageProvider):
def __init__(self, config):
self.s3_client = boto3.client(
's3',
aws_access_key_id=config.get('access_key_id'),
aws_secret_access_key=config.get('secret_access_key'),
region_name=config.get('region', 'us-east-1')
)
self.bucket_name = config.get('bucket_name')
async def store(self, key: str, data: Any, metadata: Dict[str, Any] = None):
try:
# Convert data to bytes if needed
if isinstance(data, str):
data_bytes = data.encode('utf-8')
elif isinstance(data, dict):
import json
data_bytes = json.dumps(data).encode('utf-8')
else:
data_bytes = data
# Upload to S3
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=key,
Body=data_bytes,
Metadata=metadata or {}
)
return True
except Exception as e:
print(f"S3 storage error: {e}")
return False
async def retrieve(self, key: str) -> Any:
try:
response = self.s3_client.get_object(
Bucket=self.bucket_name,
Key=key
)
data = response['Body'].read()
# Try to decode as JSON, fallback to string
try:
import json
return json.loads(data.decode('utf-8'))
except:
return data.decode('utf-8')
except Exception as e:
print(f"S3 retrieval error: {e}")
raise
s3_config = self.config.get("s3", {})
self.storage_backends["s3"] = S3StorageProvider(s3_config)
self.storage_backends["default"] = self.storage_backends["s3"]
async def _setup_redis_cache(self):
"""Setup Redis caching backend"""
class RedisStorageProvider(StorageProvider):
def __init__(self, config):
self.redis_url = config.get('url', 'redis://localhost:6379')
self.redis = None
async def connect(self):
if not self.redis:
self.redis = await aioredis.from_url(self.redis_url)
async def store(self, key: str, data: Any, metadata: Dict[str, Any] = None):
await self.connect()
try:
if isinstance(data, dict):
import json
data_str = json.dumps(data)
else:
data_str = str(data)
# Set expiration if provided in metadata
ttl = metadata.get('ttl') if metadata else None
if ttl:
await self.redis.setex(key, ttl, data_str)
else:
await self.redis.set(key, data_str)
return True
except Exception as e:
print(f"Redis storage error: {e}")
return False
async def retrieve(self, key: str) -> Any:
await self.connect()
try:
data = await self.redis.get(key)
if data:
data_str = data.decode('utf-8')
# Try to parse as JSON
try:
import json
return json.loads(data_str)
except:
return data_str
return None
except Exception as e:
print(f"Redis retrieval error: {e}")
raise
redis_config = self.config.get("redis", {})
self.storage_backends["cache"] = RedisStorageProvider(redis_config)
async def _setup_mongodb_storage(self):
"""Setup MongoDB document storage"""
class MongoDBStorageProvider(StorageProvider):
def __init__(self, config):
self.client = MongoClient(config.get('url', 'mongodb://localhost:27017'))
self.db = self.client[config.get('database', 'agenticflow')]
self.collection = self.db[config.get('collection', 'documents')]
async def store(self, key: str, data: Any, metadata: Dict[str, Any] = None):
try:
document = {
"_id": key,
"data": data,
"metadata": metadata or {},
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
self.collection.replace_one(
{"_id": key},
document,
upsert=True
)
return True
except Exception as e:
print(f"MongoDB storage error: {e}")
return False
async def retrieve(self, key: str) -> Any:
try:
document = self.collection.find_one({"_id": key})
if document:
return document["data"]
return None
except Exception as e:
print(f"MongoDB retrieval error: {e}")
raise
mongodb_config = self.config.get("mongodb", {})
self.storage_backends["documents"] = MongoDBStorageProvider(mongodb_config)
π Custom MCP Server Development
Building MCP Servers
Custom MCP Server Framework:
from agenticflow.mcp import MCPServer, MCPServerConfig
from agenticflow.mcp.protocol import MCPRequest, MCPResponse
from typing import Dict, Any, List, Optional
import asyncio
import json
class CustomMCPServer(MCPServer):
"""Custom MCP server for specialized integrations"""
def __init__(self, name: str, description: str):
super().__init__(
name=name,
description=description,
version="1.0.0"
)
self.tools = {}
self.resources = {}
self.capabilities = set()
async def initialize(self, config: MCPServerConfig):
"""Initialize the MCP server"""
# Register tools
await self._register_tools()
# Setup resources
await self._setup_resources()
# Initialize capabilities
await self._initialize_capabilities()
print(f"π MCP Server '{self.name}' initialized with {len(self.tools)} tools")
async def handle_request(self, request: MCPRequest) -> MCPResponse:
"""Handle incoming MCP requests"""
try:
if request.method == "tools/list":
return await self._handle_tools_list(request)
elif request.method == "tools/call":
return await self._handle_tool_call(request)
elif request.method == "resources/list":
return await self._handle_resources_list(request)
elif request.method == "resources/read":
return await self._handle_resource_read(request)
else:
return MCPResponse(
success=False,
error=f"Unknown method: {request.method}"
)
except Exception as e:
return MCPResponse(
success=False,
error=str(e)
)
def register_tool(self, tool_name: str, tool_func, schema: Dict[str, Any]):
"""Register a new tool"""
self.tools[tool_name] = {
"function": tool_func,
"schema": schema,
"description": schema.get("description", ""),
"parameters": schema.get("parameters", {})
}
self.capabilities.add("tools")
def register_resource(self, resource_name: str, resource_func, schema: Dict[str, Any]):
"""Register a new resource"""
self.resources[resource_name] = {
"function": resource_func,
"schema": schema,
"description": schema.get("description", ""),
"mime_type": schema.get("mime_type", "application/json")
}
self.capabilities.add("resources")
async def _handle_tools_list(self, request: MCPRequest) -> MCPResponse:
"""Handle tools list request"""
tools_list = []
for tool_name, tool_info in self.tools.items():
tools_list.append({
"name": tool_name,
"description": tool_info["description"],
"inputSchema": tool_info["parameters"]
})
return MCPResponse(
success=True,
data={"tools": tools_list}
)
async def _handle_tool_call(self, request: MCPRequest) -> MCPResponse:
"""Handle tool call request"""
tool_name = request.params.get("name")
arguments = request.params.get("arguments", {})
if tool_name not in self.tools:
return MCPResponse(
success=False,
error=f"Tool '{tool_name}' not found"
)
tool_info = self.tools[tool_name]
tool_func = tool_info["function"]
try:
# Call the tool function
if asyncio.iscoroutinefunction(tool_func):
result = await tool_func(**arguments)
else:
result = tool_func(**arguments)
return MCPResponse(
success=True,
data={"content": [{"type": "text", "text": str(result)}]}
)
except Exception as e:
return MCPResponse(
success=False,
error=f"Tool execution failed: {e}"
)
async def _handle_resources_list(self, request: MCPRequest) -> MCPResponse:
"""Handle resources list request"""
resources_list = []
for resource_name, resource_info in self.resources.items():
resources_list.append({
"uri": f"resource://{self.name}/{resource_name}",
"name": resource_name,
"description": resource_info["description"],
"mimeType": resource_info["mime_type"]
})
return MCPResponse(
success=True,
data={"resources": resources_list}
)
async def _handle_resource_read(self, request: MCPRequest) -> MCPResponse:
"""Handle resource read request"""
uri = request.params.get("uri", "")
resource_name = uri.split("/")[-1] if "/" in uri else uri
if resource_name not in self.resources:
return MCPResponse(
success=False,
error=f"Resource '{resource_name}' not found"
)
resource_info = self.resources[resource_name]
resource_func = resource_info["function"]
try:
# Call the resource function
if asyncio.iscoroutinefunction(resource_func):
result = await resource_func()
else:
result = resource_func()
return MCPResponse(
success=True,
data={
"contents": [{
"uri": uri,
"mimeType": resource_info["mime_type"],
"text": json.dumps(result) if isinstance(result, dict) else str(result)
}]
}
)
except Exception as e:
return MCPResponse(
success=False,
error=f"Resource read failed: {e}"
)
# Example: Custom CRM MCP Server
class CRMMCPServer(CustomMCPServer):
"""Custom MCP server for CRM integration"""
def __init__(self):
super().__init__(
name="custom_crm",
description="Custom CRM integration with specialized tools"
)
self.crm_client = None
async def _register_tools(self):
"""Register CRM-specific tools"""
# Customer lookup tool
self.register_tool(
"customer_lookup",
self._customer_lookup,
{
"description": "Look up customer information by ID or email",
"parameters": {
"type": "object",
"properties": {
"identifier": {
"type": "string",
"description": "Customer ID or email address"
},
"include_orders": {
"type": "boolean",
"description": "Include order history",
"default": False
}
},
"required": ["identifier"]
}
}
)
# Create customer tool
self.register_tool(
"create_customer",
self._create_customer,
{
"description": "Create a new customer record",
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string"},
"email": {"type": "string", "format": "email"},
"phone": {"type": "string"},
"company": {"type": "string"},
"notes": {"type": "string"}
},
"required": ["name", "email"]
}
}
)
# Sales pipeline tool
self.register_tool(
"update_opportunity",
self._update_opportunity,
{
"description": "Update sales opportunity status",
"parameters": {
"type": "object",
"properties": {
"opportunity_id": {"type": "string"},
"stage": {
"type": "string",
"enum": ["lead", "qualified", "proposal", "negotiation", "closed_won", "closed_lost"]
},
"value": {"type": "number"},
"notes": {"type": "string"}
},
"required": ["opportunity_id", "stage"]
}
}
)
async def _setup_resources(self):
"""Setup CRM resources"""
# Customer database resource
self.register_resource(
"customer_database",
self._get_customer_database,
{
"description": "Access to customer database information",
"mime_type": "application/json"
}
)
# Sales reports resource
self.register_resource(
"sales_reports",
self._get_sales_reports,
{
"description": "Sales performance reports and analytics",
"mime_type": "application/json"
}
)
async def _customer_lookup(self, identifier: str, include_orders: bool = False):
"""Look up customer information"""
# Simulate CRM API call
customer_data = {
"customer_id": "CUST_12345",
"name": "John Doe",
"email": identifier,
"phone": "+1-555-0123",
"company": "Tech Solutions Inc",
"created_date": "2024-01-15",
"total_spent": 15000,
"last_contact": "2024-01-20"
}
if include_orders:
customer_data["orders"] = [
{"order_id": "ORD_001", "date": "2024-01-10", "amount": 5000},
{"order_id": "ORD_002", "date": "2024-01-18", "amount": 10000}
]
return customer_data
async def _create_customer(self, name: str, email: str, **kwargs):
"""Create new customer record"""
import uuid
customer_id = f"CUST_{str(uuid.uuid4())[:8].upper()}"
customer_record = {
"customer_id": customer_id,
"name": name,
"email": email,
"phone": kwargs.get("phone"),
"company": kwargs.get("company"),
"notes": kwargs.get("notes"),
"created_date": "2024-01-20",
"status": "active"
}
# Simulate database save
print(f"πΎ Created customer: {customer_record}")
return {
"success": True,
"customer_id": customer_id,
"message": "Customer created successfully"
}
async def _update_opportunity(self, opportunity_id: str, stage: str, **kwargs):
"""Update sales opportunity"""
opportunity_update = {
"opportunity_id": opportunity_id,
"stage": stage,
"value": kwargs.get("value"),
"notes": kwargs.get("notes"),
"updated_date": "2024-01-20",
"updated_by": "mcp_server"
}
# Simulate CRM update
print(f"π Updated opportunity: {opportunity_update}")
return {
"success": True,
"message": f"Opportunity {opportunity_id} updated to {stage}"
}
async def _get_customer_database(self):
"""Get customer database summary"""
return {
"total_customers": 1250,
"active_customers": 1100,
"new_this_month": 45,
"top_customers": [
{"name": "Enterprise Corp", "spent": 50000},
{"name": "Tech Solutions", "spent": 35000},
{"name": "Innovation Inc", "spent": 28000}
]
}
async def _get_sales_reports(self):
"""Get sales reports"""
return {
"monthly_revenue": 125000,
"deals_closed": 15,
"pipeline_value": 450000,
"conversion_rate": 0.23,
"top_performers": [
{"name": "Alice Johnson", "deals": 8, "revenue": 45000},
{"name": "Bob Smith", "deals": 5, "revenue": 32000}
]
}
# Usage example
async def run_custom_mcp_server():
"""Run the custom CRM MCP server"""
server = CRMMCPServer()
# Initialize server
config = MCPServerConfig(
host="localhost",
port=8080,
transport="stdio" # or "http"
)
await server.initialize(config)
# Start server
await server.start()
print("π Custom CRM MCP Server running on port 8080")
if __name__ == "__main__":
asyncio.run(run_custom_mcp_server())
MCP Protocol Implementation
Advanced MCP Protocol Features:
from agenticflow.mcp.protocol import MCPProtocol
from agenticflow.mcp.transport import MCPTransport
import asyncio
import websockets
import json
class AdvancedMCPServer(MCPServer):
"""Advanced MCP server with streaming and notifications"""
def __init__(self, name: str):
super().__init__(name, f"Advanced MCP server: {name}")
self.subscriptions = {}
self.streaming_tools = {}
def register_streaming_tool(self, tool_name: str, stream_func, schema: Dict[str, Any]):
"""Register a streaming tool that returns continuous data"""
self.streaming_tools[tool_name] = {
"function": stream_func,
"schema": schema
}
# Also register as regular tool
self.register_tool(tool_name, self._handle_streaming_tool_wrapper, schema)
async def _handle_streaming_tool_wrapper(self, **kwargs):
"""Wrapper for streaming tools"""
tool_name = kwargs.get("__tool_name__")
if tool_name in self.streaming_tools:
stream_func = self.streaming_tools[tool_name]["function"]
# Start streaming in background
asyncio.create_task(self._stream_tool_data(tool_name, stream_func, kwargs))
return {
"message": f"Streaming started for {tool_name}",
"stream_id": f"{tool_name}_{id(kwargs)}"
}
return {"error": "Streaming tool not found"}
async def _stream_tool_data(self, tool_name: str, stream_func, kwargs):
"""Stream data from tool"""
try:
async for data in stream_func(**kwargs):
# Send notification to subscribed clients
await self.send_notification("tools/stream", {
"tool": tool_name,
"data": data,
"timestamp": datetime.utcnow().isoformat()
})
await asyncio.sleep(0.1) # Rate limiting
except Exception as e:
await self.send_notification("tools/stream_error", {
"tool": tool_name,
"error": str(e)
})
async def send_notification(self, method: str, params: Dict[str, Any]):
"""Send notification to all connected clients"""
notification = {
"jsonrpc": "2.0",
"method": method,
"params": params
}
# Send to all connected clients
for client_id, client in self.connected_clients.items():
try:
await client.send(json.dumps(notification))
except Exception as e:
print(f"Failed to send notification to client {client_id}: {e}")
# Example: Real-time monitoring MCP server
class MonitoringMCPServer(AdvancedMCPServer):
"""MCP server for real-time system monitoring"""
def __init__(self):
super().__init__("monitoring")
async def _register_tools(self):
"""Register monitoring tools"""
# System metrics streaming tool
self.register_streaming_tool(
"stream_system_metrics",
self._stream_system_metrics,
{
"description": "Stream real-time system metrics",
"parameters": {
"type": "object",
"properties": {
"interval": {
"type": "number",
"description": "Update interval in seconds",
"default": 5.0
},
"metrics": {
"type": "array",
"items": {"type": "string"},
"description": "Metrics to monitor",
"default": ["cpu", "memory", "disk"]
}
}
}
}
)
# Log monitoring tool
self.register_streaming_tool(
"stream_logs",
self._stream_logs,
{
"description": "Stream log files in real-time",
"parameters": {
"type": "object",
"properties": {
"log_file": {
"type": "string",
"description": "Path to log file"
},
"filter_pattern": {
"type": "string",
"description": "Regex pattern to filter logs"
}
},
"required": ["log_file"]
}
}
)
async def _stream_system_metrics(self, interval: float = 5.0, metrics: List[str] = None):
"""Stream system metrics"""
import psutil
metrics = metrics or ["cpu", "memory", "disk"]
while True:
metric_data = {}
if "cpu" in metrics:
metric_data["cpu"] = {
"percent": psutil.cpu_percent(interval=1),
"count": psutil.cpu_count(),
"freq": psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None
}
if "memory" in metrics:
memory = psutil.virtual_memory()
metric_data["memory"] = {
"percent": memory.percent,
"total": memory.total,
"used": memory.used,
"available": memory.available
}
if "disk" in metrics:
disk = psutil.disk_usage('/')
metric_data["disk"] = {
"percent": disk.used / disk.total * 100,
"total": disk.total,
"used": disk.used,
"free": disk.free
}
yield metric_data
await asyncio.sleep(interval)
async def _stream_logs(self, log_file: str, filter_pattern: str = None):
"""Stream log file contents"""
import re
import aiofiles
pattern = re.compile(filter_pattern) if filter_pattern else None
try:
async with aiofiles.open(log_file, 'r') as file:
# Seek to end of file
await file.seek(0, 2)
while True:
line = await file.readline()
if line:
# Apply filter if specified
if pattern is None or pattern.search(line):
yield {
"timestamp": datetime.utcnow().isoformat(),
"line": line.strip(),
"source": log_file
}
else:
await asyncio.sleep(0.1)
except Exception as e:
yield {
"error": f"Failed to read log file {log_file}: {e}"
}
π Workflow Engine Extensions
Custom Workflow Engines
Specialized Workflow Engine:
from agenticflow.engines import WorkflowEngine, WorkflowExecutionContext
from agenticflow.workflow import Workflow, WorkflowNode, WorkflowEdge
from typing import Dict, Any, List, Optional
import asyncio
class DistributedWorkflowEngine(WorkflowEngine):
"""Distributed workflow engine for multi-node execution"""
def __init__(self, cluster_config: Dict[str, Any]):
super().__init__("distributed_engine", "1.0.0")
self.cluster_config = cluster_config
self.worker_nodes = {}
self.load_balancer = None
self.execution_tracker = {}
async def initialize(self, platform_context):
"""Initialize distributed workflow engine"""
# Setup cluster connectivity
await self._setup_cluster()
# Initialize load balancer
await self._setup_load_balancer()
# Start worker health monitoring
asyncio.create_task(self._monitor_worker_health())
return True
async def execute_workflow(
self,
workflow: Workflow,
context: WorkflowExecutionContext
) -> Dict[str, Any]:
"""Execute workflow across distributed nodes"""
execution_id = context.execution_id
try:
# Analyze workflow for distribution opportunities
distribution_plan = await self._analyze_workflow_distribution(workflow)
# Create execution tracker
self.execution_tracker[execution_id] = {
"workflow": workflow,
"context": context,
"distribution_plan": distribution_plan,
"status": "running",
"start_time": datetime.utcnow(),
"node_executions": {}
}
# Execute workflow nodes according to distribution plan
results = await self._execute_distributed_workflow(
workflow,
context,
distribution_plan
)
# Update tracker
self.execution_tracker[execution_id]["status"] = "completed"
self.execution_tracker[execution_id]["end_time"] = datetime.utcnow()
self.execution_tracker[execution_id]["results"] = results
return results
except Exception as e:
self.execution_tracker[execution_id]["status"] = "failed"
self.execution_tracker[execution_id]["error"] = str(e)
raise
async def _analyze_workflow_distribution(self, workflow: Workflow) -> Dict[str, Any]:
"""Analyze workflow for optimal distribution"""
distribution_plan = {
"parallelizable_groups": [],
"sequential_dependencies": [],
"resource_requirements": {},
"node_assignments": {}
}
# Find parallelizable node groups
parallelizable_groups = self._find_parallel_groups(workflow)
distribution_plan["parallelizable_groups"] = parallelizable_groups
# Analyze resource requirements
for node in workflow.nodes:
resource_req = self._analyze_node_resources(node)
distribution_plan["resource_requirements"][node.id] = resource_req
# Assign node to best worker
best_worker = await self._select_worker_for_node(node, resource_req)
distribution_plan["node_assignments"][node.id] = best_worker
return distribution_plan
def _find_parallel_groups(self, workflow: Workflow) -> List[List[str]]:
"""Find groups of nodes that can run in parallel"""
# Build dependency graph
dependencies = {}
for edge in workflow.edges:
if edge.target not in dependencies:
dependencies[edge.target] = []
dependencies[edge.target].append(edge.source)
# Find levels of execution
levels = []
processed = set()
while len(processed) < len(workflow.nodes):
# Find nodes with no unprocessed dependencies
current_level = []
for node in workflow.nodes:
if node.id not in processed:
node_deps = dependencies.get(node.id, [])
if all(dep in processed for dep in node_deps):
current_level.append(node.id)
if current_level:
levels.append(current_level)
processed.update(current_level)
else:
break # Prevent infinite loop
return levels
def _analyze_node_resources(self, node: WorkflowNode) -> Dict[str, Any]:
"""Analyze resource requirements for a node"""
# Default resource requirements
resources = {
"cpu_cores": 1,
"memory_mb": 512,
"gpu_required": False,
"network_intensive": False,
"storage_mb": 100
}
# Analyze node type and configuration
if node.type == "ai_llm":
resources.update({
"cpu_cores": 2,
"memory_mb": 2048,
"gpu_required": node.config.get("model", "").startswith("large"),
"network_intensive": True
})
elif node.type == "data_processing":
resources.update({
"cpu_cores": 4,
"memory_mb": 4096,
"storage_mb": 1000
})
elif node.type == "image_processing":
resources.update({
"cpu_cores": 2,
"memory_mb": 3072,
"gpu_required": True
})
return resources
async def _select_worker_for_node(
self,
node: WorkflowNode,
resource_req: Dict[str, Any]
) -> str:
"""Select best worker node for execution"""
best_worker = None
best_score = -1
for worker_id, worker_info in self.worker_nodes.items():
if not worker_info["healthy"]:
continue
# Check resource availability
if not self._worker_can_handle_requirements(worker_info, resource_req):
continue
# Calculate suitability score
score = self._calculate_worker_score(worker_info, resource_req)
if score > best_score:
best_score = score
best_worker = worker_id
return best_worker or "localhost" # Fallback to local execution
def _worker_can_handle_requirements(
self,
worker_info: Dict[str, Any],
resource_req: Dict[str, Any]
) -> bool:
"""Check if worker can handle resource requirements"""
capabilities = worker_info["capabilities"]
# Check CPU cores
if resource_req["cpu_cores"] > capabilities.get("cpu_cores", 1):
return False
# Check memory
if resource_req["memory_mb"] > capabilities.get("available_memory_mb", 0):
return False
# Check GPU requirement
if resource_req["gpu_required"] and not capabilities.get("gpu_available", False):
return False
return True
def _calculate_worker_score(
self,
worker_info: Dict[str, Any],
resource_req: Dict[str, Any]
) -> float:
"""Calculate suitability score for worker"""
score = 0.0
capabilities = worker_info["capabilities"]
current_load = worker_info.get("current_load", 0.5)
# Prefer less loaded workers
score += (1.0 - current_load) * 50
# Prefer workers with more available resources
cpu_ratio = capabilities.get("cpu_cores", 1) / resource_req["cpu_cores"]
memory_ratio = capabilities.get("available_memory_mb", 512) / resource_req["memory_mb"]
score += min(cpu_ratio, 2.0) * 20
score += min(memory_ratio, 2.0) * 20
# Bonus for GPU availability when needed
if resource_req["gpu_required"] and capabilities.get("gpu_available"):
score += 30
# Network connectivity score
network_score = worker_info.get("network_score", 1.0)
score += network_score * 10
return score
async def _execute_distributed_workflow(
self,
workflow: Workflow,
context: WorkflowExecutionContext,
distribution_plan: Dict[str, Any]
) -> Dict[str, Any]:
"""Execute workflow using distribution plan"""
execution_results = {}
node_outputs = {}
# Execute nodes level by level
for level_nodes in distribution_plan["parallelizable_groups"]:
# Execute all nodes in this level in parallel
level_tasks = []
for node_id in level_nodes:
node = next(n for n in workflow.nodes if n.id == node_id)
worker_id = distribution_plan["node_assignments"][node_id]
task = self._execute_node_on_worker(
node,
worker_id,
node_outputs,
context
)
level_tasks.append((node_id, task))
# Wait for all nodes in level to complete
for node_id, task in level_tasks:
try:
result = await task
execution_results[node_id] = result
node_outputs[node_id] = result.get("output")
except Exception as e:
execution_results[node_id] = {"error": str(e)}
# Handle failure - could implement retry logic here
return {
"execution_results": execution_results,
"final_output": node_outputs.get(workflow.output_node_id),
"distribution_stats": self._get_distribution_stats(distribution_plan)
}
async def _execute_node_on_worker(
self,
node: WorkflowNode,
worker_id: str,
node_outputs: Dict[str, Any],
context: WorkflowExecutionContext
) -> Dict[str, Any]:
"""Execute a single node on specified worker"""
if worker_id == "localhost" or worker_id not in self.worker_nodes:
# Execute locally
return await self._execute_node_locally(node, node_outputs, context)
# Execute on remote worker
worker_client = self.worker_nodes[worker_id]["client"]
execution_request = {
"node": node.to_dict(),
"inputs": {
input_name: node_outputs.get(input_source)
for input_name, input_source in node.input_mappings.items()
},
"context": context.to_dict()
}
try:
result = await worker_client.execute_node(execution_request)
return result
except Exception as e:
# Fallback to local execution
print(f"Remote execution failed for {node.id}, falling back to local: {e}")
return await self._execute_node_locally(node, node_outputs, context)
async def _setup_cluster(self):
"""Setup cluster connectivity"""
cluster_nodes = self.cluster_config.get("nodes", [])
for node_config in cluster_nodes:
worker_id = node_config["id"]
# Initialize worker client
worker_client = WorkerClient(node_config["address"], node_config["port"])
try:
# Test connectivity
health = await worker_client.health_check()
self.worker_nodes[worker_id] = {
"config": node_config,
"client": worker_client,
"healthy": health["healthy"],
"capabilities": health.get("capabilities", {}),
"current_load": health.get("load", 0.0),
"network_score": 1.0, # Will be updated by monitoring
"last_health_check": datetime.utcnow()
}
print(f"β
Connected to worker {worker_id}")
except Exception as e:
print(f"β Failed to connect to worker {worker_id}: {e}")
async def _monitor_worker_health(self):
"""Monitor worker node health"""
while True:
for worker_id, worker_info in self.worker_nodes.items():
try:
health = await worker_info["client"].health_check()
worker_info.update({
"healthy": health["healthy"],
"current_load": health.get("load", 0.0),
"last_health_check": datetime.utcnow()
})
except Exception as e:
worker_info["healthy"] = False
print(f"β οΈ Worker {worker_id} health check failed: {e}")
await asyncio.sleep(30) # Check every 30 seconds
class WorkerClient:
"""Client for communicating with worker nodes"""
def __init__(self, address: str, port: int):
self.address = address
self.port = port
self.session = None
async def health_check(self) -> Dict[str, Any]:
"""Check worker health"""
# Implement actual health check
return {
"healthy": True,
"load": 0.3,
"capabilities": {
"cpu_cores": 8,
"available_memory_mb": 16384,
"gpu_available": True
}
}
async def execute_node(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Execute node on worker"""
# Implement actual node execution
return {
"success": True,
"output": "Node executed successfully on worker",
"execution_time": 1.5
}
π Deployment & Management
Extension Deployment Pipeline
Automated Extension Deployment:
from agenticflow.deployment import ExtensionDeployer
from agenticflow.packaging import ExtensionPackage
import docker
import kubernetes
import yaml
class ExtensionDeploymentPipeline:
"""Automated deployment pipeline for platform extensions"""
def __init__(self, deployment_config: Dict[str, Any]):
self.config = deployment_config
self.docker_client = docker.from_env()
self.k8s_client = kubernetes.client.ApiClient()
async def deploy_extension(
self,
extension_package: ExtensionPackage,
environment: str = "production"
) -> Dict[str, Any]:
"""Deploy extension to specified environment"""
deployment_result = {
"extension": extension_package.name,
"version": extension_package.version,
"environment": environment,
"status": "deploying",
"steps": []
}
try:
# Step 1: Build container image
image_result = await self._build_container_image(extension_package)
deployment_result["steps"].append(image_result)
# Step 2: Run security scan
security_result = await self._run_security_scan(image_result["image_id"])
deployment_result["steps"].append(security_result)
# Step 3: Deploy to Kubernetes
k8s_result = await self._deploy_to_kubernetes(
extension_package,
image_result["image_id"],
environment
)
deployment_result["steps"].append(k8s_result)
# Step 4: Run health checks
health_result = await self._run_deployment_health_checks(
extension_package.name,
environment
)
deployment_result["steps"].append(health_result)
# Step 5: Register with platform
registration_result = await self._register_with_platform(
extension_package,
environment
)
deployment_result["steps"].append(registration_result)
deployment_result["status"] = "deployed"
deployment_result["deployment_url"] = k8s_result.get("service_url")
except Exception as e:
deployment_result["status"] = "failed"
deployment_result["error"] = str(e)
return deployment_result
async def _build_container_image(self, package: ExtensionPackage) -> Dict[str, Any]:
"""Build Docker container for extension"""
# Generate Dockerfile
dockerfile_content = self._generate_dockerfile(package)
# Build image
image, build_logs = self.docker_client.images.build(
fileobj=io.BytesIO(dockerfile_content.encode()),
tag=f"{package.name}:{package.version}",
rm=True,
forcerm=True
)
return {
"step": "build_image",
"status": "completed",
"image_id": image.id,
"image_tag": f"{package.name}:{package.version}",
"size": image.attrs["Size"]
}
def _generate_dockerfile(self, package: ExtensionPackage) -> str:
"""Generate Dockerfile for extension"""
dockerfile = f"""
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \\
gcc \\
&& rm -rf /var/lib/apt/lists/*
# Copy extension files
COPY {package.source_path} /app/
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Install the extension
RUN pip install --no-cache-dir .
# Set environment variables
ENV PYTHONPATH=/app
ENV EXTENSION_NAME={package.name}
ENV EXTENSION_VERSION={package.version}
# Expose port
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \\
CMD curl -f http://localhost:8080/health || exit 1
# Run extension
CMD ["python", "-m", "{package.entry_point}"]
"""
return dockerfile.strip()
async def _run_security_scan(self, image_id: str) -> Dict[str, Any]:
"""Run security scan on container image"""
# Implement security scanning (e.g., with Trivy, Clair, etc.)
scan_result = {
"step": "security_scan",
"status": "completed",
"vulnerabilities": {
"critical": 0,
"high": 2,
"medium": 5,
"low": 12
},
"scan_passed": True # Based on threshold policy
}
if scan_result["vulnerabilities"]["critical"] > 0:
scan_result["status"] = "failed"
scan_result["scan_passed"] = False
raise Exception("Critical vulnerabilities found in image")
return scan_result
async def _deploy_to_kubernetes(
self,
package: ExtensionPackage,
image_id: str,
environment: str
) -> Dict[str, Any]:
"""Deploy extension to Kubernetes"""
# Generate Kubernetes manifests
deployment_manifest = self._generate_k8s_deployment(package, image_id, environment)
service_manifest = self._generate_k8s_service(package, environment)
configmap_manifest = self._generate_k8s_configmap(package, environment)
# Apply manifests
k8s_apps_v1 = kubernetes.client.AppsV1Api(self.k8s_client)
k8s_core_v1 = kubernetes.client.CoreV1Api(self.k8s_client)
namespace = f"agenticflow-{environment}"
try:
# Create/update ConfigMap
try:
k8s_core_v1.patch_namespaced_config_map(
name=f"{package.name}-config",
namespace=namespace,
body=configmap_manifest
)
except kubernetes.client.ApiException as e:
if e.status == 404:
k8s_core_v1.create_namespaced_config_map(
namespace=namespace,
body=configmap_manifest
)
# Create/update Deployment
try:
k8s_apps_v1.patch_namespaced_deployment(
name=package.name,
namespace=namespace,
body=deployment_manifest
)
except kubernetes.client.ApiException as e:
if e.status == 404:
k8s_apps_v1.create_namespaced_deployment(
namespace=namespace,
body=deployment_manifest
)
# Create/update Service
try:
k8s_core_v1.patch_namespaced_service(
name=f"{package.name}-service",
namespace=namespace,
body=service_manifest
)
except kubernetes.client.ApiException as e:
if e.status == 404:
k8s_core_v1.create_namespaced_service(
namespace=namespace,
body=service_manifest
)
return {
"step": "kubernetes_deploy",
"status": "completed",
"namespace": namespace,
"service_url": f"http://{package.name}-service.{namespace}.svc.cluster.local:8080"
}
except Exception as e:
return {
"step": "kubernetes_deploy",
"status": "failed",
"error": str(e)
}
def _generate_k8s_deployment(
self,
package: ExtensionPackage,
image_id: str,
environment: str
) -> Dict[str, Any]:
"""Generate Kubernetes deployment manifest"""
return {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": package.name,
"labels": {
"app": package.name,
"version": package.version,
"environment": environment,
"component": "extension"
}
},
"spec": {
"replicas": self.config.get("replicas", 2),
"selector": {
"matchLabels": {
"app": package.name
}
},
"template": {
"metadata": {
"labels": {
"app": package.name,
"version": package.version
}
},
"spec": {
"containers": [{
"name": package.name,
"image": f"{package.name}:{package.version}",
"ports": [{
"containerPort": 8080,
"name": "http"
}],
"env": [
{
"name": "ENVIRONMENT",
"value": environment
},
{
"name": "EXTENSION_NAME",
"value": package.name
}
],
"resources": {
"requests": {
"memory": "256Mi",
"cpu": "100m"
},
"limits": {
"memory": "512Mi",
"cpu": "500m"
}
},
"livenessProbe": {
"httpGet": {
"path": "/health",
"port": 8080
},
"initialDelaySeconds": 30,
"periodSeconds": 10
},
"readinessProbe": {
"httpGet": {
"path": "/ready",
"port": 8080
},
"initialDelaySeconds": 5,
"periodSeconds": 5
}
}]
}
}
}
}
π Next Steps & Advanced Topics
π Advanced Extension Development
Extension Security Guide - Security best practices for extensions
Performance Optimization - Optimize extension performance
Monitoring & Observability - Monitor extension health and performance
π οΈ Extension Resources
Extension SDK Reference - Complete SDK documentation
Deployment Automation - Automated deployment pipelines
Testing Strategies - Comprehensive testing approaches
π¬ Community & Support
Extension Developer Community - Connect with extension developers
Marketplace Submission - Publish your extensions
Enterprise Support - Professional extension support
π§© Platform extensions unlock the full potential of AgenticFlow by allowing deep integration with core platform capabilities. From custom authentication systems to distributed workflow engines, extensions let you build enterprise-grade solutions that extend and enhance the entire AgenticFlow ecosystem. Start building platform extensions today and shape the future of AI automation.
Extend the platform, empower the ecosystem, enable the impossible.
Last updated
Was this helpful?