Files
X-Financial/server/tests/test_employee_behavior_profile_service.py
caoxiaozhu 92444e7eae feat: 扩展风险规则体系、审批动态路由与预算中心列表化改造
- 新增 25+ 条风险规则(预算/报销/申请/通用类),完善风险规则模拟与反馈发布机制
- 引入费用审批动态路由、平台风险分级、预审与风险阶段管理
- 预算中心列表化改造,优化票据夹仪表盘与数字员工工作看板
- 新增 Hermes 风险线索收集器、Agent 链路追踪中心
- 扩展数字员工能力库(18 个领域 Skill)与交通费用自动预估
- 完善报销申请快速预览、权限控制与前端测试覆盖
2026-06-01 17:07:14 +08:00

503 lines
16 KiB
Python

from __future__ import annotations
from collections.abc import Generator
from datetime import UTC, date, datetime, timedelta
from decimal import Decimal
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.api.deps import get_db
from app.db.base import Base
from app.main import create_app
from app.models.agent_run import AgentRun, AgentToolCall
from app.models.employee import Employee
from app.models.employee_behavior_profile import EmployeeBehaviorProfileSnapshot
from app.models.financial_record import ExpenseClaim, ExpenseClaimItem
from app.models.organization import OrganizationUnit
from app.models.user_session_metric import UserSessionMetric
from app.services.employee_behavior_profile_service import EmployeeBehaviorProfileService
from app.services.hermes_employee_profile_scanner import HermesEmployeeProfileScannerService
from app.services.hermes_scheduler import HermesScheduler
def build_session_factory() -> sessionmaker[Session]:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
return sessionmaker(bind=engine, autoflush=False, autocommit=False)
def seed_profile_data(db: Session) -> None:
org = OrganizationUnit(
id="dept-sales",
unit_code="SALES",
name="市场部",
unit_type="department",
)
employee = Employee(
id="emp-main",
employee_no="E1001",
name="张三",
email="zhangsan@example.com",
position="客户经理",
grade="P5",
organization_unit=org,
)
peer_a = Employee(
id="emp-peer-a",
employee_no="E1002",
name="李四",
email="lisi@example.com",
position="客户经理",
grade="P5",
organization_unit=org,
)
peer_b = Employee(
id="emp-peer-b",
employee_no="E1003",
name="王五",
email="wangwu@example.com",
position="客户经理",
grade="P5",
organization_unit=org,
)
db.add_all([org, employee, peer_a, peer_b])
now = datetime.now(UTC)
claims = [
_build_claim(
"claim-main-1",
employee.id,
employee.name,
Decimal("5000"),
now - timedelta(days=5),
missing_attachment=True,
),
_build_claim(
"claim-main-2",
employee.id,
employee.name,
Decimal("4200"),
now - timedelta(days=15),
returned=True,
),
_build_claim(
"claim-main-3", employee.id, employee.name, Decimal("3800"), now - timedelta(days=30)
),
_build_claim(
"claim-peer-a", peer_a.id, peer_a.name, Decimal("1200"), now - timedelta(days=12)
),
_build_claim(
"claim-peer-b", peer_b.id, peer_b.name, Decimal("1500"), now - timedelta(days=18)
),
]
db.add_all(claims)
db.add(
AgentRun(
run_id="run-main-1",
agent="hermes",
source="user_message",
user_id=employee.email,
status="success",
result_summary="AI 已辅助生成报销说明。",
started_at=now - timedelta(days=2),
tool_calls=[
AgentToolCall(
run_id="run-main-1",
tool_type="expense",
tool_name="claim_draft",
request_json={"message": "出差报销"},
response_json={"ok": True},
status="success",
duration_ms=120,
)
],
)
)
db.commit()
def _build_claim(
claim_id: str,
employee_id: str,
employee_name: str,
amount: Decimal,
occurred_at: datetime,
*,
missing_attachment: bool = False,
returned: bool = False,
) -> ExpenseClaim:
invoice_id = None if missing_attachment else f"invoice-{claim_id}"
return ExpenseClaim(
id=claim_id,
claim_no=f"EXP-{claim_id}",
employee_id=employee_id,
employee_name=employee_name,
department_id="dept-sales",
department_name="市场部",
project_code="PRJ-001",
expense_type="travel",
reason="客户拜访出差",
location="北京",
amount=amount,
currency="CNY",
invoice_count=0 if missing_attachment else 1,
occurred_at=occurred_at,
submitted_at=occurred_at,
status="returned" if returned else "submitted",
approval_stage="直属领导审批",
risk_flags_json=[{"source": "manual_return", "message": "补充票据"}] if returned else [],
items=[
ExpenseClaimItem(
id=f"item-{claim_id}",
claim_id=claim_id,
item_date=date.today(),
item_type="travel",
item_reason="客户拜访出差",
item_location="北京",
item_amount=amount,
invoice_id=invoice_id,
)
],
)
def add_closed_user_session(
db: Session,
*,
session_id: str,
username: str,
display_name: str = "",
employee_no: str = "",
duration_ms: int = 30 * 60 * 1000,
) -> None:
logout_at = datetime.now(UTC) - timedelta(minutes=1)
login_at = logout_at - timedelta(milliseconds=duration_ms)
db.add(
UserSessionMetric(
session_id=session_id,
username=username,
display_name=display_name or username,
employee_no=employee_no,
email=username if "@" in username else "",
login_at=login_at,
logout_at=logout_at,
last_activity_at=logout_at,
duration_ms=duration_ms,
activity_event_count=8,
logout_reason="manual",
status="closed",
)
)
db.commit()
def test_service_scans_snapshots_and_filters_approval_scene() -> None:
session_factory = build_session_factory()
with session_factory() as db:
seed_profile_data(db)
summary = HermesEmployeeProfileScannerService(db).scan_employee_profiles(log_id=None)
assert summary["target_employee_count"] >= 1
assert db.query(EmployeeBehaviorProfileSnapshot).count() >= 4
latest = EmployeeBehaviorProfileService(db).get_latest_profile(
employee_id="emp-main",
scene="approval",
claim_id="claim-main-1",
window_days=90,
expense_type_scope="travel",
)
assert latest.employee_id == "emp-main"
assert {item.profile_type for item in latest.profiles} == {"expense", "process_quality"}
assert latest.review_priority_score > 0
assert latest.peer_group.sample_size >= 3
assert latest.profile_tags
assert latest.radar.dimensions
def test_service_resolves_latest_profile_by_employee_name_identifier() -> None:
session_factory = build_session_factory()
with session_factory() as db:
seed_profile_data(db)
employee = db.get(Employee, "emp-main")
assert employee is not None
latest = EmployeeBehaviorProfileService(db).get_latest_profile(
employee_id=employee.name,
scene="approval",
claim_id="claim-main-1",
window_days=90,
expense_type_scope="travel",
)
assert latest.employee_id == "emp-main"
assert latest.empty_reason == ""
assert {item.profile_type for item in latest.profiles} == {"expense", "process_quality"}
def test_latest_profile_endpoint_returns_approval_payload() -> None:
session_factory = build_session_factory()
with session_factory() as db:
seed_profile_data(db)
app = create_app()
def override_db() -> Generator[Session, None, None]:
db = session_factory()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
response = client.get(
"/api/v1/employee-profiles/emp-main/latest",
params={
"scene": "approval",
"claim_id": "claim-main-1",
"window_days": 90,
"expense_type_scope": "travel",
},
headers={"x-auth-username": "auditor", "x-auth-name": "auditor"},
)
assert response.status_code == 200
payload = response.json()
assert payload["employee_id"] == "emp-main"
assert {item["profile_type"] for item in payload["profiles"]} == {"expense", "process_quality"}
assert payload["review_priority_score"] >= 0
assert payload["profile_tags"]
assert {item["code"] for item in payload["radar"]["dimensions"]} == {
"expense_intensity",
"application_rhythm",
"travel_entertainment",
"material_completeness",
"process_pressure",
}
def test_current_employee_profile_endpoint_resolves_login_user() -> None:
session_factory = build_session_factory()
with session_factory() as db:
seed_profile_data(db)
EmployeeBehaviorProfileService(db).refresh_employee_profiles(
employee_id="emp-main",
window_days=(90,),
expense_type_scope="overall",
)
add_closed_user_session(
db,
session_id="session-employee-current",
username="zhangsan@example.com",
display_name="张三",
employee_no="E1001",
)
app = create_app()
def override_db() -> Generator[Session, None, None]:
db = session_factory()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
response = client.get(
"/api/v1/employee-profiles/me/latest",
params={
"scene": "operations",
"window_days": 90,
"expense_type_scope": "overall",
},
headers={"x-auth-username": "zhangsan@example.com"},
)
assert response.status_code == 200
payload = response.json()
assert payload["employee_id"] == "emp-main"
assert {item["profile_type"] for item in payload["profiles"]} >= {"expense", "ai_usage"}
ai_profile = next(item for item in payload["profiles"] if item["profile_type"] == "ai_usage")
assert ai_profile["metrics"]["ai_run_duration_ms"] == 120
assert ai_profile["metrics"]["online_duration_ms"] == 30 * 60 * 1000
assert ai_profile["metrics"]["usage_duration_ms"] == 30 * 60 * 1000
assert ai_profile["metrics"]["usage_duration_mode"] == "online_session"
assert payload["profile_tags"]
assert payload["radar"]["dimensions"]
def test_current_admin_profile_endpoint_returns_account_usage_profile() -> None:
session_factory = build_session_factory()
with session_factory() as db:
seed_profile_data(db)
now = datetime.now(UTC)
for index in range(12):
run_id = f"run-admin-usage-{index}"
started_at = now - timedelta(days=1, minutes=index)
db.add(
AgentRun(
run_id=run_id,
agent="user_agent",
source="user_message",
user_id="admin",
status="success",
result_summary="管理员查看运行概览。",
started_at=started_at,
finished_at=started_at + timedelta(seconds=2),
tool_calls=[
AgentToolCall(
run_id=run_id,
tool_type="database",
tool_name="agent_runs.list",
request_json={"limit": 20},
response_json={"ok": True},
status="success",
duration_ms=120,
)
],
)
)
db.commit()
app = create_app()
def override_db() -> Generator[Session, None, None]:
db = session_factory()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
response = client.get(
"/api/v1/employee-profiles/me/latest",
params={
"scene": "operations",
"window_days": 90,
"expense_type_scope": "overall",
},
headers={"x-auth-username": "admin", "x-auth-name": "admin", "x-auth-is-admin": "true"},
)
assert response.status_code == 200
payload = response.json()
assert payload["employee_id"] == "admin"
assert payload["empty_reason"] == ""
assert [item["profile_type"] for item in payload["profiles"]] == ["ai_usage"]
metrics = payload["profiles"][0]["metrics"]
assert metrics["ai_run_count"] == 12
assert metrics["ai_run_duration_ms"] == 24000
assert payload["profile_tags"]
assert payload["radar"]["dimensions"]
def test_current_admin_profile_endpoint_uses_online_session_without_agent_runs() -> None:
session_factory = build_session_factory()
with session_factory() as db:
add_closed_user_session(
db,
session_id="session-admin-online",
username="admin",
display_name="admin",
duration_ms=12 * 60 * 1000,
)
app = create_app()
def override_db() -> Generator[Session, None, None]:
db = session_factory()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
response = client.get(
"/api/v1/employee-profiles/me/latest",
params={
"scene": "operations",
"window_days": 90,
"expense_type_scope": "overall",
},
headers={"x-auth-username": "admin", "x-auth-name": "admin", "x-auth-is-admin": "true"},
)
assert response.status_code == 200
payload = response.json()
assert payload["employee_id"] == "admin"
assert payload["empty_reason"] == ""
metrics = payload["profiles"][0]["metrics"]
assert metrics["ai_run_count"] == 0
assert metrics["online_duration_ms"] == 12 * 60 * 1000
assert metrics["usage_duration_ms"] == 12 * 60 * 1000
assert metrics["usage_duration_mode"] == "online_session"
def test_finish_session_endpoint_closes_active_session() -> None:
session_factory = build_session_factory()
login_at = datetime.now(UTC) - timedelta(minutes=9)
with session_factory() as db:
db.add(
UserSessionMetric(
session_id="session-active-finish",
username="zhangsan@example.com",
display_name="张三",
employee_no="E1001",
email="zhangsan@example.com",
login_at=login_at,
last_activity_at=login_at,
status="active",
)
)
db.commit()
app = create_app()
def override_db() -> Generator[Session, None, None]:
db = session_factory()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
client = TestClient(app)
response = client.post(
"/api/v1/auth/sessions/session-active-finish/finish",
json={
"reason": "manual",
"lastActivityAt": datetime.now(UTC).isoformat(),
"activityEventCount": 5,
"pagePath": "/workbench",
},
)
assert response.status_code == 200
payload = response.json()
assert payload["sessionId"] == "session-active-finish"
assert payload["durationMs"] > 0
with session_factory() as db:
session = db.query(UserSessionMetric).filter_by(session_id="session-active-finish").one()
assert session.status == "closed"
assert session.activity_event_count == 5
def test_hermes_scheduler_parses_weekly_profile_cron() -> None:
scheduler = HermesScheduler()
assert scheduler._parse_simple_cron("30 8 * * 1") == (30, 8, 0)
assert scheduler._parse_simple_cron("0 9 * * *") == (0, 9, None)
assert scheduler._parse_simple_cron("bad cron") is None