Files
JARVIS/backend/app/tools/implementations/task_manager.py

195 lines
5.5 KiB
Python
Raw Normal View History

"""
Task Manager Tool
Task creation, management and status tracking.
"""
import uuid
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class TaskStatus(str, Enum):
"""Task status"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
"""Task definition"""
id: str
name: str
description: str
status: TaskStatus = TaskStatus.PENDING
created_at: datetime = field(default_factory=datetime.utcnow)
scheduled_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
result: Optional[Any] = None
error: Optional[str] = None
class TaskManager:
"""Task manager tool"""
def __init__(self, config: dict):
self._tasks: Dict[str, Task] = {}
async def create_task(
self,
name: str,
description: str,
scheduled_at: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Create a new task"""
task_id = str(uuid.uuid4())[:8]
task = Task(
id=task_id,
name=name,
description=description,
scheduled_at=scheduled_at,
)
self._tasks[task_id] = task
return {
"status": "success",
"result": {
"id": task_id,
"name": task.name,
"status": task.status.value,
"created_at": task.created_at.isoformat(),
},
}
async def list_tasks(
self,
status: Optional[str] = None,
) -> Dict[str, Any]:
"""List tasks with optional status filter"""
tasks = list(self._tasks.values())
if status:
tasks = [t for t in tasks if t.status.value == status]
return {
"status": "success",
"result": [
{
"id": t.id,
"name": t.name,
"description": t.description,
"status": t.status.value,
"created_at": t.created_at.isoformat(),
"scheduled_at": t.scheduled_at.isoformat() if t.scheduled_at else None,
}
for t in tasks
],
}
async def get_task(self, task_id: str) -> Dict[str, Any]:
"""Get task details"""
task = self._tasks.get(task_id)
if not task:
return {"status": "error", "error": "Task not found"}
return {
"status": "success",
"result": {
"id": task.id,
"name": task.name,
"description": task.description,
"status": task.status.value,
"result": task.result,
"error": task.error,
"created_at": task.created_at.isoformat(),
"completed_at": task.completed_at.isoformat() if task.completed_at else None,
},
}
async def update_task_status(
self,
task_id: str,
status: str,
) -> Dict[str, Any]:
"""Update task status"""
task = self._tasks.get(task_id)
if not task:
return {"status": "error", "error": "Task not found"}
try:
task.status = TaskStatus(status)
return {"status": "success"}
except ValueError:
return {"status": "error", "error": f"Invalid status: {status}"}
async def complete_task(
self,
task_id: str,
result: Any,
) -> Dict[str, Any]:
"""Mark task as completed"""
task = self._tasks.get(task_id)
if not task:
return {"status": "error", "error": "Task not found"}
task.status = TaskStatus.COMPLETED
task.result = result
task.completed_at = datetime.utcnow()
return {"status": "success"}
async def fail_task(
self,
task_id: str,
error: str,
) -> Dict[str, Any]:
"""Mark task as failed"""
task = self._tasks.get(task_id)
if not task:
return {"status": "error", "error": "Task not found"}
task.status = TaskStatus.FAILED
task.error = error
task.completed_at = datetime.utcnow()
return {"status": "success"}
async def delete_task(self, task_id: str) -> Dict[str, Any]:
"""Delete a task"""
if task_id not in self._tasks:
return {"status": "error", "error": "Task not found"}
del self._tasks[task_id]
return {"status": "success"}
def create_task_manager_executor(config: dict):
"""Create task manager executor"""
manager = TaskManager(config)
async def execute(command: str, parameters: dict) -> dict:
if command == "create_task":
return await manager.create_task(**parameters)
elif command == "list_tasks":
return await manager.list_tasks(**parameters)
elif command == "get_task":
return await manager.get_task(**parameters)
elif command == "update_task_status":
return await manager.update_task_status(**parameters)
elif command == "complete_task":
return await manager.complete_task(**parameters)
elif command == "fail_task":
return await manager.fail_task(**parameters)
elif command == "delete_task":
return await manager.delete_task(**parameters)
else:
return {"status": "error", "error": f"Unknown command: {command}"}
return execute