from __future__ import annotations from typing import Any from app.schemas.steward import ( StewardActionStatus, StewardActionStep, StewardPlanResponse, StewardTask, ) class StewardActionPlanBuilder: """把小财管家任务转换为确定性的白名单动作步骤。""" def attach_action_steps(self, plan: StewardPlanResponse) -> StewardPlanResponse: if not plan.tasks: return plan.model_copy(update={"action_steps": []}) tasks: list[StewardTask] = [] plan_steps = [self._build_detect_intent_step(plan)] for task in plan.tasks: task_steps = self.build_task_action_steps(task) tasks.append(task.model_copy(update={"action_steps": task_steps})) plan_steps.extend(task_steps) return plan.model_copy(update={"tasks": tasks, "action_steps": plan_steps}) def build_task_action_steps(self, task: StewardTask) -> list[StewardActionStep]: from app.services.steward_intent_registry import get_intent intent = get_intent(task.task_type) if intent is not None: return intent.action_steps_builder(task) if task.task_type == "expense_application": return self.build_application_steps(task) if task.task_type == "reimbursement": return self.build_reimbursement_steps(task) return [] def build_application_steps(self, task: StewardTask) -> list[StewardActionStep]: return self._build_application_steps(task) def build_reimbursement_steps(self, task: StewardTask) -> list[StewardActionStep]: return self._build_reimbursement_steps(task) def _build_application_steps(self, task: StewardTask) -> list[StewardActionStep]: steps = [ self._build_task_step( task, 1, "fill_application_fields", "填充申请字段", payload=self._field_payload(task), ), self._build_task_step( task, 2, "build_application_preview", "展示申请核对表", depends_on_index=1, payload={"task_id": task.task_id, "ontology_fields": task.ontology_fields}, ), self._build_task_step( task, 3, "validate_required_fields", "校验申请必填字段", status=self._validation_status(task), depends_on_index=2, payload={"missing_fields": task.missing_fields}, ), ] if task.requested_action == "save_draft": steps.append( self._build_task_step( task, 4, "save_application_draft", "保存申请草稿", status=self._side_effect_status(task, requires_confirmation=False), depends_on_index=3, payload={ "task_id": task.task_id, "requested_action": task.requested_action, "ontology_fields": task.ontology_fields, }, ) ) elif task.requested_action == "submit": steps.append( self._build_task_step( task, 4, "run_duplicate_precheck", "检查重复或冲突申请", status=self._side_effect_status(task, requires_confirmation=False), depends_on_index=3, payload={"task_id": task.task_id, "precheck_type": "travel_overlap"}, ) ) steps.append( self._build_task_step( task, 5, "submit_application", "提交申请审批", status=self._side_effect_status(task, requires_confirmation=True), requires_confirmation=not bool(task.missing_fields), depends_on_index=4, payload={ "task_id": task.task_id, "requested_action": task.requested_action, "confirmation_required": True, }, ) ) return steps def _build_reimbursement_steps(self, task: StewardTask) -> list[StewardActionStep]: steps = [ self._build_task_step( task, 1, "fill_reimbursement_fields", "填充报销字段", payload=self._field_payload(task), ), self._build_task_step( task, 2, "build_reimbursement_preview", "展示报销核对表", depends_on_index=1, payload={"task_id": task.task_id, "ontology_fields": task.ontology_fields}, ), self._build_task_step( task, 3, "validate_required_fields", "校验报销必填字段", status=self._validation_status(task), depends_on_index=2, payload={"missing_fields": task.missing_fields}, ), self._build_task_step( task, 4, "create_reimbursement_draft", "创建报销草稿", status=self._side_effect_status(task, requires_confirmation=False), depends_on_index=3, payload={ "task_id": task.task_id, "requested_action": task.requested_action, "ontology_fields": task.ontology_fields, }, ), ] if task.ontology_fields.get("attachments"): steps.append( self._build_task_step( task, 5, "associate_attachments", "关联报销附件", status=self._side_effect_status(task, requires_confirmation=False), depends_on_index=4, payload={"task_id": task.task_id, "attachments": task.ontology_fields["attachments"]}, ) ) return steps @staticmethod def _build_detect_intent_step(plan: StewardPlanResponse) -> StewardActionStep: return StewardActionStep( step_id="plan:00:detect_intent", action_type="detect_intent", label="识别业务意图", status="completed", payload={ "planning_source": plan.planning_source, "plan_status": plan.plan_status, }, ) def _build_task_step( self, task: StewardTask, index: int, action_type: str, label: str, *, status: StewardActionStatus = "planned", requires_confirmation: bool = False, depends_on_index: int | None = None, payload: dict[str, Any] | None = None, ) -> StewardActionStep: step_id = self._step_id(task, index) depends_on = [self._step_id(task, depends_on_index)] if depends_on_index is not None else [] return StewardActionStep( step_id=step_id, action_type=action_type, # type: ignore[arg-type] label=label, target_task_id=task.task_id, status=status, requires_confirmation=requires_confirmation, depends_on=depends_on, payload=payload or {}, ) @staticmethod def _field_payload(task: StewardTask) -> dict[str, Any]: return { "task_id": task.task_id, "task_type": task.task_type, "requested_action": task.requested_action, "ontology_fields": task.ontology_fields, "missing_fields": task.missing_fields, } @staticmethod def _validation_status(task: StewardTask) -> StewardActionStatus: return "blocked" if task.missing_fields else "planned" @staticmethod def _side_effect_status( task: StewardTask, *, requires_confirmation: bool, ) -> StewardActionStatus: if task.missing_fields: return "blocked" return "pending_confirmation" if requires_confirmation else "planned" @staticmethod def _step_id(task: StewardTask, index: int) -> str: return f"{task.task_id}:{index:02d}"