from __future__ import annotations from collections.abc import Generator from datetime import UTC, date, datetime, timedelta from decimal import Decimal from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.pool import StaticPool from app.api.deps import get_db from app.db.base import Base from app.main import create_app from app.models.agent_run import AgentRun, AgentToolCall from app.models.employee import Employee from app.models.employee_behavior_profile import EmployeeBehaviorProfileSnapshot from app.models.financial_record import ExpenseClaim, ExpenseClaimItem from app.models.organization import OrganizationUnit from app.models.user_session_metric import UserSessionMetric from app.services.employee_behavior_profile_service import EmployeeBehaviorProfileService from app.services.hermes_employee_profile_scanner import HermesEmployeeProfileScannerService from app.services.hermes_scheduler import HermesScheduler def build_session_factory() -> sessionmaker[Session]: engine = create_engine( "sqlite+pysqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) return sessionmaker(bind=engine, autoflush=False, autocommit=False) def seed_profile_data(db: Session) -> None: org = OrganizationUnit( id="dept-sales", unit_code="SALES", name="市场部", unit_type="department", ) employee = Employee( id="emp-main", employee_no="E1001", name="张三", email="zhangsan@example.com", position="客户经理", grade="P5", organization_unit=org, ) peer_a = Employee( id="emp-peer-a", employee_no="E1002", name="李四", email="lisi@example.com", position="客户经理", grade="P5", organization_unit=org, ) peer_b = Employee( id="emp-peer-b", employee_no="E1003", name="王五", email="wangwu@example.com", position="客户经理", grade="P5", organization_unit=org, ) db.add_all([org, employee, peer_a, peer_b]) now = datetime.now(UTC) claims = [ _build_claim( "claim-main-1", employee.id, employee.name, Decimal("5000"), now - timedelta(days=5), missing_attachment=True, ), _build_claim( "claim-main-2", employee.id, employee.name, Decimal("4200"), now - timedelta(days=15), returned=True, ), _build_claim( "claim-main-3", employee.id, employee.name, Decimal("3800"), now - timedelta(days=30) ), _build_claim( "claim-peer-a", peer_a.id, peer_a.name, Decimal("1200"), now - timedelta(days=12) ), _build_claim( "claim-peer-b", peer_b.id, peer_b.name, Decimal("1500"), now - timedelta(days=18) ), ] db.add_all(claims) db.add( AgentRun( run_id="run-main-1", agent="hermes", source="user_message", user_id=employee.email, status="success", result_summary="AI 已辅助生成报销说明。", started_at=now - timedelta(days=2), tool_calls=[ AgentToolCall( run_id="run-main-1", tool_type="expense", tool_name="claim_draft", request_json={"message": "出差报销"}, response_json={"ok": True}, status="success", duration_ms=120, ) ], ) ) db.commit() def _build_claim( claim_id: str, employee_id: str, employee_name: str, amount: Decimal, occurred_at: datetime, *, missing_attachment: bool = False, returned: bool = False, ) -> ExpenseClaim: invoice_id = None if missing_attachment else f"invoice-{claim_id}" return ExpenseClaim( id=claim_id, claim_no=f"EXP-{claim_id}", employee_id=employee_id, employee_name=employee_name, department_id="dept-sales", department_name="市场部", project_code="PRJ-001", expense_type="travel", reason="客户拜访出差", location="北京", amount=amount, currency="CNY", invoice_count=0 if missing_attachment else 1, occurred_at=occurred_at, submitted_at=occurred_at, status="returned" if returned else "submitted", approval_stage="直属领导审批", risk_flags_json=[{"source": "manual_return", "message": "补充票据"}] if returned else [], items=[ ExpenseClaimItem( id=f"item-{claim_id}", claim_id=claim_id, item_date=date.today(), item_type="travel", item_reason="客户拜访出差", item_location="北京", item_amount=amount, invoice_id=invoice_id, ) ], ) def add_closed_user_session( db: Session, *, session_id: str, username: str, display_name: str = "", employee_no: str = "", duration_ms: int = 30 * 60 * 1000, ) -> None: logout_at = datetime.now(UTC) - timedelta(minutes=1) login_at = logout_at - timedelta(milliseconds=duration_ms) db.add( UserSessionMetric( session_id=session_id, username=username, display_name=display_name or username, employee_no=employee_no, email=username if "@" in username else "", login_at=login_at, logout_at=logout_at, last_activity_at=logout_at, duration_ms=duration_ms, activity_event_count=8, logout_reason="manual", status="closed", ) ) db.commit() def test_service_scans_snapshots_and_filters_approval_scene() -> None: session_factory = build_session_factory() with session_factory() as db: seed_profile_data(db) summary = HermesEmployeeProfileScannerService(db).scan_employee_profiles(log_id=None) assert summary["target_employee_count"] >= 1 assert db.query(EmployeeBehaviorProfileSnapshot).count() >= 4 latest = EmployeeBehaviorProfileService(db).get_latest_profile( employee_id="emp-main", scene="approval", claim_id="claim-main-1", window_days=90, expense_type_scope="travel", ) assert latest.employee_id == "emp-main" assert {item.profile_type for item in latest.profiles} == {"expense", "process_quality"} assert latest.review_priority_score > 0 assert latest.peer_group.sample_size >= 3 assert latest.profile_tags assert latest.radar.dimensions def test_service_resolves_latest_profile_by_employee_name_identifier() -> None: session_factory = build_session_factory() with session_factory() as db: seed_profile_data(db) employee = db.get(Employee, "emp-main") assert employee is not None latest = EmployeeBehaviorProfileService(db).get_latest_profile( employee_id=employee.name, scene="approval", claim_id="claim-main-1", window_days=90, expense_type_scope="travel", ) assert latest.employee_id == "emp-main" assert latest.empty_reason == "" assert {item.profile_type for item in latest.profiles} == {"expense", "process_quality"} def test_latest_profile_endpoint_returns_approval_payload() -> None: session_factory = build_session_factory() with session_factory() as db: seed_profile_data(db) app = create_app() def override_db() -> Generator[Session, None, None]: db = session_factory() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_db client = TestClient(app) response = client.get( "/api/v1/employee-profiles/emp-main/latest", params={ "scene": "approval", "claim_id": "claim-main-1", "window_days": 90, "expense_type_scope": "travel", }, headers={"x-auth-username": "auditor", "x-auth-name": "auditor"}, ) assert response.status_code == 200 payload = response.json() assert payload["employee_id"] == "emp-main" assert {item["profile_type"] for item in payload["profiles"]} == {"expense", "process_quality"} assert payload["review_priority_score"] >= 0 assert payload["profile_tags"] assert {item["code"] for item in payload["radar"]["dimensions"]} == { "expense_intensity", "application_rhythm", "travel_entertainment", "material_completeness", "process_pressure", } def test_current_employee_profile_endpoint_resolves_login_user() -> None: session_factory = build_session_factory() with session_factory() as db: seed_profile_data(db) EmployeeBehaviorProfileService(db).refresh_employee_profiles( employee_id="emp-main", window_days=(90,), expense_type_scope="overall", ) add_closed_user_session( db, session_id="session-employee-current", username="zhangsan@example.com", display_name="张三", employee_no="E1001", ) app = create_app() def override_db() -> Generator[Session, None, None]: db = session_factory() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_db client = TestClient(app) response = client.get( "/api/v1/employee-profiles/me/latest", params={ "scene": "operations", "window_days": 90, "expense_type_scope": "overall", }, headers={"x-auth-username": "zhangsan@example.com"}, ) assert response.status_code == 200 payload = response.json() assert payload["employee_id"] == "emp-main" assert {item["profile_type"] for item in payload["profiles"]} >= {"expense", "ai_usage"} ai_profile = next(item for item in payload["profiles"] if item["profile_type"] == "ai_usage") assert ai_profile["metrics"]["ai_run_duration_ms"] == 120 assert ai_profile["metrics"]["online_duration_ms"] == 30 * 60 * 1000 assert ai_profile["metrics"]["usage_duration_ms"] == 30 * 60 * 1000 assert ai_profile["metrics"]["usage_duration_mode"] == "online_session" assert payload["profile_tags"] assert payload["radar"]["dimensions"] def test_current_admin_profile_endpoint_returns_account_usage_profile() -> None: session_factory = build_session_factory() with session_factory() as db: seed_profile_data(db) now = datetime.now(UTC) for index in range(12): run_id = f"run-admin-usage-{index}" started_at = now - timedelta(days=1, minutes=index) db.add( AgentRun( run_id=run_id, agent="user_agent", source="user_message", user_id="admin", status="success", result_summary="管理员查看运行概览。", started_at=started_at, finished_at=started_at + timedelta(seconds=2), tool_calls=[ AgentToolCall( run_id=run_id, tool_type="database", tool_name="agent_runs.list", request_json={"limit": 20}, response_json={"ok": True}, status="success", duration_ms=120, ) ], ) ) db.commit() app = create_app() def override_db() -> Generator[Session, None, None]: db = session_factory() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_db client = TestClient(app) response = client.get( "/api/v1/employee-profiles/me/latest", params={ "scene": "operations", "window_days": 90, "expense_type_scope": "overall", }, headers={"x-auth-username": "admin", "x-auth-name": "admin", "x-auth-is-admin": "true"}, ) assert response.status_code == 200 payload = response.json() assert payload["employee_id"] == "admin" assert payload["empty_reason"] == "" assert [item["profile_type"] for item in payload["profiles"]] == ["ai_usage"] metrics = payload["profiles"][0]["metrics"] assert metrics["ai_run_count"] == 12 assert metrics["ai_run_duration_ms"] == 24000 assert payload["profile_tags"] assert payload["radar"]["dimensions"] def test_current_admin_profile_endpoint_uses_online_session_without_agent_runs() -> None: session_factory = build_session_factory() with session_factory() as db: add_closed_user_session( db, session_id="session-admin-online", username="admin", display_name="admin", duration_ms=12 * 60 * 1000, ) app = create_app() def override_db() -> Generator[Session, None, None]: db = session_factory() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_db client = TestClient(app) response = client.get( "/api/v1/employee-profiles/me/latest", params={ "scene": "operations", "window_days": 90, "expense_type_scope": "overall", }, headers={"x-auth-username": "admin", "x-auth-name": "admin", "x-auth-is-admin": "true"}, ) assert response.status_code == 200 payload = response.json() assert payload["employee_id"] == "admin" assert payload["empty_reason"] == "" metrics = payload["profiles"][0]["metrics"] assert metrics["ai_run_count"] == 0 assert metrics["online_duration_ms"] == 12 * 60 * 1000 assert metrics["usage_duration_ms"] == 12 * 60 * 1000 assert metrics["usage_duration_mode"] == "online_session" def test_finish_session_endpoint_closes_active_session() -> None: session_factory = build_session_factory() login_at = datetime.now(UTC) - timedelta(minutes=9) with session_factory() as db: db.add( UserSessionMetric( session_id="session-active-finish", username="zhangsan@example.com", display_name="张三", employee_no="E1001", email="zhangsan@example.com", login_at=login_at, last_activity_at=login_at, status="active", ) ) db.commit() app = create_app() def override_db() -> Generator[Session, None, None]: db = session_factory() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_db client = TestClient(app) response = client.post( "/api/v1/auth/sessions/session-active-finish/finish", json={ "reason": "manual", "lastActivityAt": datetime.now(UTC).isoformat(), "activityEventCount": 5, "pagePath": "/workbench", }, ) assert response.status_code == 200 payload = response.json() assert payload["sessionId"] == "session-active-finish" assert payload["durationMs"] > 0 with session_factory() as db: session = db.query(UserSessionMetric).filter_by(session_id="session-active-finish").one() assert session.status == "closed" assert session.activity_event_count == 5 def test_hermes_scheduler_parses_weekly_profile_cron() -> None: scheduler = HermesScheduler() assert scheduler._parse_simple_cron("30 8 * * 1") == (30, 8, 0) assert scheduler._parse_simple_cron("0 9 * * *") == (0, 9, None) assert scheduler._parse_simple_cron("bad cron") is None