Files
JARVIS/backend/app/services/admin_bootstrap_service.py

184 lines
6.4 KiB
Python
Raw Permalink Normal View History

from sqlalchemy import or_, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.skill import Skill
from app.models.user import User
from app.services.auth_service import get_password_hash
BUILTIN_SKILLS = [
{
'name': '今日重点拆解',
'description': '帮助日程规划师从上下文中提炼今天最值得推进的事项。',
'instructions': '优先识别今天最关键的 1-3 个重点,说明原因,并给出可执行顺序。',
'agent_type': 'schedule_planner',
'tools': ['calendar', 'tasks'],
'visibility': 'market',
},
{
'name': '周计划编排',
'description': '把本周目标整理成可落地的节奏与时间块。',
'instructions': '将目标拆成周内节奏安排,明确先后顺序、时间块与缓冲。',
'agent_type': 'schedule_planner',
'tools': ['calendar'],
'visibility': 'market',
},
{
'name': '时间冲突分析',
'description': '识别任务、日程与优先级之间的冲突。',
'instructions': '分析冲突来源、影响和推荐取舍,必要时给出替代方案。',
'agent_type': 'schedule_planner',
'tools': ['calendar', 'tasks'],
'visibility': 'market',
},
{
'name': '任务执行 SOP',
'description': '为执行角色提供标准执行步骤和结果回报格式。',
'instructions': '执行前先确认目标与边界,执行中记录关键动作,执行后输出结果、风险与下一步。',
'agent_type': 'executor',
'tools': ['shell', 'api_calls'],
'visibility': 'market',
},
{
'name': '外部交互推进',
'description': '支持论坛、外部接口或内容发布类动作。',
'instructions': '围绕外部交互任务,优先保证动作完整、结果清晰、反馈及时。',
'agent_type': 'executor',
'tools': ['api_calls', 'git'],
'visibility': 'market',
},
{
'name': '知识检索摘要',
'description': '从知识中枢中提炼与当前问题最相关的信息。',
'instructions': '检索后只保留当前决策需要的内容,输出摘要、来源与缺口。',
'agent_type': 'librarian',
'tools': ['web_search', 'database'],
'visibility': 'market',
},
{
'name': '图谱沉淀策略',
'description': '帮助知识管理员把零散信息沉淀为结构化关系。',
'instructions': '识别应沉淀的实体、关系与后续可检索维度。',
'agent_type': 'librarian',
'tools': ['database'],
'visibility': 'market',
},
{
'name': '风险识别模板',
'description': '帮助分析师快速识别当前推进中的风险点。',
'instructions': '从进度、依赖、资源与外部信号中提炼风险,并按严重度排序。',
'agent_type': 'analyst',
'tools': ['database', 'api_calls'],
'visibility': 'market',
},
{
'name': '趋势洞察模板',
'description': '把多源状态汇总为趋势与判断。',
'instructions': '对比近期变化,输出趋势、证据、判断与建议动作。',
'agent_type': 'analyst',
'tools': ['database', 'code_execution'],
'visibility': 'market',
},
]
def _is_bootstrap_enabled(settings) -> bool:
return bool(settings.ADMIN.strip() and settings.ADMIN_EMAIL.strip() and settings.ADMIN_PASSWORD.strip())
async def ensure_admin_user(db: AsyncSession, settings) -> None:
if not _is_bootstrap_enabled(settings):
return
result = await db.execute(
select(User).where(
or_(User.username == settings.ADMIN.strip(), User.email == settings.ADMIN_EMAIL.strip())
)
)
existing_user = result.scalar_one_or_none()
if existing_user:
if (
existing_user.username == settings.ADMIN.strip()
and existing_user.email == settings.ADMIN_EMAIL.strip()
and existing_user.is_superuser
):
return
raise RuntimeError('admin bootstrap identity conflict')
admin_user = User(
username=settings.ADMIN.strip(),
email=settings.ADMIN_EMAIL.strip(),
hashed_password=get_password_hash(settings.ADMIN_PASSWORD),
full_name=settings.ADMIN_FULL_NAME or None,
is_active=True,
is_superuser=True,
)
db.add(admin_user)
try:
await db.commit()
except IntegrityError:
await db.rollback()
result = await db.execute(
select(User).where(
or_(User.username == settings.ADMIN.strip(), User.email == settings.ADMIN_EMAIL.strip())
)
)
existing_user = result.scalar_one_or_none()
if (
existing_user
and existing_user.username == settings.ADMIN.strip()
and existing_user.email == settings.ADMIN_EMAIL.strip()
and existing_user.is_superuser
):
return
raise
await db.refresh(admin_user)
async def ensure_builtin_skills(db: AsyncSession, preferred_owner_id: str | None = None) -> None:
owner = None
if preferred_owner_id:
owner_result = await db.execute(
select(User).where(User.id == preferred_owner_id, User.is_active == True)
)
owner = owner_result.scalar_one_or_none()
if not owner:
owner_result = await db.execute(
select(User).where(User.is_active == True).order_by(User.is_superuser.desc(), User.created_at.asc())
)
owner = owner_result.scalars().first()
if not owner:
return
existing_result = await db.execute(select(Skill.name))
existing_names = set(existing_result.scalars().all())
missing_skills = [
Skill(
owner_id=owner.id,
name=item['name'],
description=item['description'],
instructions=item['instructions'],
agent_type=item['agent_type'],
tools=item['tools'],
required_context=[],
output_format=None,
visibility=item['visibility'],
is_builtin=True,
team_id=None,
is_active=True,
)
for item in BUILTIN_SKILLS
if item['name'] not in existing_names
]
if not missing_skills:
return
db.add_all(missing_skills)
await db.commit()