Storage-Custodian
Overview
Storage-Custodian is the storage management agent of EchoCenter, responsible for monitoring and managing the storage system. It is a Python agent that communicates with the backend via WebSocket.
Features
1. Storage Monitoring
Storage-Custodian monitors files and disk space in the storage directory:
python
def get_storage_stats():
files = []
total_size = 0
for f in os.listdir(STORAGE_DIR):
fp = os.path.join(STORAGE_DIR, f)
if os.path.isfile(fp):
files.append({
"name": os.path.basename(fp),
"size": os.path.getsize(fp),
"modified": datetime.fromtimestamp(os.path.getmtime(fp))
})
total_size += os.path.getsize(fp)
return {
"total_files": len(files),
"total_size": total_size,
"files": files
}2. System Log
Storage-Custodian periodically sends system logs:
python
stats = get_storage_stats()
await ws.send(json.dumps({
"type": "SYSTEM_LOG",
"payload": {
"level": "SUCCESS",
"content": f"Storage-Custodian initialized. Monitoring {STORAGE_DIR}. {stats['total_files']} files detected."
}
}))3. Respond to Instructions
Storage-Custodian supports two response paths:
- Butler-originated request: send one final
CHAT. - User-originated request: send
CHAT_STREAMchunks, then finalCHAT(samestream_id) for persistence, thenCHAT_STREAM_END.
python
async def handle_command(command):
if command.startswith("get_status"):
stats = get_storage_stats()
return f"Storage Status: {stats['total_files']} files, {stats['total_size']} bytes"
elif command.startswith("list_files"):
# List files
pass
elif command.startswith("get_file"):
# Get file
passConfiguration
Environment Variables
env
# Butler Configuration (Storage-Custodian uses the same Butler configuration)
BUTLER_BASE_URL=https://api.siliconflow.cn/v1
BUTLER_API_TOKEN=your_api_token_here
BUTLER_MODEL=gpt-3.5-turboStorage Directory
Storage-Custodian uses mock_agents/hive_storage as the storage directory:
python
# Use a path relative to the script location
script_dir = os.path.dirname(os.path.abspath(__file__))
STORAGE_DIR = os.path.join(script_dir, "hive_storage")
if not os.path.exists(STORAGE_DIR):
os.makedirs(STORAGE_DIR)Connection
Connection Address
python
import asyncio
import websockets
import json
ECHOCENTER_WS_URL = "ws://localhost:8080/api/ws"
async def agent_loop(api_token):
uri = f"{ECHOCENTER_WS_URL}?token={api_token}"
async with websockets.connect(uri) as ws:
# Send system log
stats = get_storage_stats()
await ws.send(json.dumps({
"type": "SYSTEM_LOG",
"payload": {
"level": "SUCCESS",
"content": f"Storage-Custodian initialized. Monitoring {STORAGE_DIR}. {stats['total_files']} files detected."
}
}))
# Receive messages
async for message in ws:
msg = json.loads(message)
await handle_message(msg)Connection Flow
1. Start Storage-Custodian
↓
2. Connect to WebSocket server
↓
3. Validate API token
↓
4. Send system log
↓
5. Start receiving messagesMessage Processing
System Log
json
{
"type": "SYSTEM_LOG",
"sender_id": 7,
"sender_name": "Storage-Custodian",
"sender_role": "AGENT",
"payload": {
"level": "SUCCESS",
"content": "Storage-Custodian initialized. Monitoring /path/to/hive_storage. 10 files detected."
}
}Respond to Instruction (Butler path)
json
{
"type": "CHAT",
"sender_id": 7,
"sender_name": "Storage-Custodian",
"sender_role": "AGENT",
"target_id": 2,
"payload": "Storage Status: 10 files, 102400 bytes",
"timestamp": "2024-01-01T00:00:00Z"
}Respond to Instruction (User path with persistence)
json
{
"type": "CHAT_STREAM",
"sender_id": 7,
"sender_name": "Storage-Custodian",
"sender_role": "AGENT",
"target_id": 1,
"stream_id": "stream_123",
"payload": "partial chunk..."
}json
{
"type": "CHAT",
"sender_id": 7,
"sender_name": "Storage-Custodian",
"sender_role": "AGENT",
"target_id": 1,
"stream_id": "stream_123",
"payload": "final full response for history persistence"
}json
{
"type": "CHAT_STREAM_END",
"sender_id": 7,
"sender_name": "Storage-Custodian",
"sender_role": "AGENT",
"target_id": 1,
"stream_id": "stream_123"
}Commands
get_status
Get storage status:
bash
get_statusResponse:
Storage Status: 10 files, 102400 byteslist_files
List files:
bash
list_filesResponse:
Files:
- file1.txt (1024 bytes)
- file2.txt (2048 bytes)get_file
Get file:
bash
get_file filenameResponse:
File content...Example
Start Storage-Custodian
bash
cd backend
python3 mock_agents/storage_custodian.pyCheck Storage Status
python
# Storage-Custodian sends system log automatically after startup
{
"type": "SYSTEM_LOG",
"sender_id": 7,
"sender_name": "Storage-Custodian",
"sender_role": "AGENT",
"payload": {
"level": "SUCCESS",
"content": "Storage-Custodian initialized. Monitoring /path/to/hive_storage. 10 files detected."
}
}Query Storage Status
Butler can send instructions:
json
{
"type": "CHAT",
"sender_id": 2,
"sender_name": "Butler",
"sender_role": "BUTLER",
"target_id": 7,
"payload": "get_status",
"timestamp": "2024-01-01T00:00:00Z"
}Storage-Custodian responds:
json
{
"type": "CHAT",
"sender_id": 7,
"sender_name": "Storage-Custodian",
"sender_role": "AGENT",
"target_id": 2,
"payload": "Storage Status: 10 files, 102400 bytes",
"timestamp": "2024-01-01T00:00:01Z"
}Best Practices
1. Error Handling
python
async def handle_message(msg):
try:
if msg["type"] == "CHAT":
response = await handle_command(msg["payload"])
await ws.send(json.dumps({
"type": "CHAT",
"sender_id": 7,
"sender_name": "Storage-Custodian",
"sender_role": "AGENT",
"target_id": msg["sender_id"],
"payload": response,
"timestamp": datetime.utcnow().isoformat()
}))
except Exception as e:
log.error(f"Error handling message: {e}")2. Logging
python
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def handle_message(msg):
logger.info(f"Received message: {msg}")
# Process message3. Status Monitoring
python
def get_storage_stats():
try:
files = []
total_size = 0
for f in os.listdir(STORAGE_DIR):
fp = os.path.join(STORAGE_DIR, f)
if os.path.isfile(fp):
files.append({
"name": os.path.basename(fp),
"size": os.path.getsize(fp),
"modified": datetime.fromtimestamp(os.path.getmtime(fp))
})
total_size += os.path.getsize(fp)
return {
"total_files": len(files),
"total_size": total_size,
"files": files
}
except Exception as e:
logger.error(f"Error getting storage stats: {e}")
return {"error": str(e)}Scalability
Adding New Commands
- Add command parsing in
handle_command. - Implement command logic.
- Return response.
Adding New Features
- Add new functions in
get_storage_stats. - Update message handling.
- Test functionality.
Performance Optimization
- Asynchronous processing
- Connection pool
- Caching
- Concurrent processing
Security
- API token validation
- Input validation
- Error handling
Troubleshooting
Connection Failed
Check:
- Backend service is running.
- WebSocket URL is correct.
- API token is valid.
Message Not Sent
Check:
- WebSocket connection is normal.
- Message format is correct.
- Network is normal.
Storage Directory Does Not Exist
Storage-Custodian will automatically create the storage directory:
python
if not os.path.exists(STORAGE_DIR):
os.makedirs(STORAGE_DIR)