test(agents): expand Code Commander tests to 67 tests

- Phase 1: state, prompts, tools registry (13 tests)
- Phase 2: AI adapters, security classifier, sandbox/executors (54 tests)
  - SecurityClassifier: 21 tests covering classify() with edge cases
  - SandboxEnvironment: 5 tests for create/cleanup/list_files
  - DirectExecutor: 3 tests with mocked subprocess
  - SandboxExecutor: 6 tests with mocked subprocess
- Phase 3: schemas (8 tests)
This commit is contained in:
2026-04-05 18:06:17 +08:00
parent f033fb5879
commit f0658201e5

View File

@@ -2,9 +2,12 @@
Tests for Code Commander module (Phases 1-3) Tests for Code Commander module (Phases 1-3)
""" """
import asyncio
import sys import sys
import tempfile
from pathlib import Path
from types import SimpleNamespace from types import SimpleNamespace
from unittest.mock import Mock from unittest.mock import AsyncMock, MagicMock, Mock, patch
import pytest import pytest
@@ -39,6 +42,11 @@ from app.agents.tools.security_classifier import (
SecurityClassifier, SecurityClassifier,
) )
from app.agents.tools.direct_executor import DirectExecutor from app.agents.tools.direct_executor import DirectExecutor
from app.agents.tools.sandbox_executor import (
SandboxExecutor,
SandboxEnvironment,
ExecutionResult as SandboxExecutionResult,
)
from app.agents.schemas.task import ( from app.agents.schemas.task import (
CodeProviderType, CodeProviderType,
RiskLevelType, RiskLevelType,
@@ -171,51 +179,331 @@ class TestPhase2AIAdapter:
assert result.error is None assert result.error is None
assert result.exit_code == 0 assert result.exit_code == 0
def test_claude_adapter_build_command(self):
adapter = ClaudeAdapter()
cmd = adapter.build_command("test prompt", Path("/tmp/workspace"))
assert "claude" in cmd
assert "-p" in cmd
assert "test prompt" in cmd
def test_gemini_adapter_build_command(self):
adapter = GeminiAdapter()
cmd = adapter.build_command("test prompt", None)
assert "gemini" in cmd
assert "-p" in cmd
class TestPhase2SecurityClassifier: class TestPhase2SecurityClassifier:
"""Comprehensive SecurityClassifier tests"""
def test_risk_level_enum_values(self): def test_risk_level_enum_values(self):
assert RiskLevel.LOW.value == "low" assert RiskLevel.LOW.value == "low"
assert RiskLevel.HIGH.value == "high" assert RiskLevel.HIGH.value == "high"
def test_security_classifier_classify_low_risk_demo(self): # LOW risk tests
def test_classify_low_risk_demo(self):
classifier = SecurityClassifier() classifier = SecurityClassifier()
result = classifier.classify("写一个贪食蛇 demo") result = classifier.classify("写一个贪食蛇 demo")
assert result == RiskLevel.LOW assert result == RiskLevel.LOW
def test_security_classifier_classify_low_risk_simple(self): def test_classify_low_risk_example(self):
classifier = SecurityClassifier() classifier = SecurityClassifier()
result = classifier.classify("帮我写一个 hello world") result = classifier.classify("给我一个代码示例")
assert result == RiskLevel.LOW assert result == RiskLevel.LOW
def test_security_classifier_classify_high_risk_dangerous(self): def test_classify_low_risk_small_game(self):
classifier = SecurityClassifier()
result = classifier.classify("创建一个俄罗斯方块小游戏")
assert result == RiskLevel.LOW
def test_classify_low_risk_new_project(self):
classifier = SecurityClassifier()
result = classifier.classify("帮我创建一个新项目")
assert result == RiskLevel.LOW
def test_classify_low_risk_write_file(self):
classifier = SecurityClassifier()
result = classifier.classify("写一个 hello world")
assert result == RiskLevel.LOW
def test_classify_low_risk_generate_code(self):
classifier = SecurityClassifier()
# "代码示例" is explicitly in LOW_RISK_KEYWORDS
result = classifier.classify("给我一个代码示例")
assert result == RiskLevel.LOW
# HIGH risk tests
def test_classify_high_risk_delete(self):
classifier = SecurityClassifier() classifier = SecurityClassifier()
result = classifier.classify("删除所有文件 rm -rf") result = classifier.classify("删除所有文件 rm -rf")
assert result == RiskLevel.HIGH assert result == RiskLevel.HIGH
def test_security_classifier_classify_high_risk_format(self): def test_classify_high_risk_format(self):
classifier = SecurityClassifier() classifier = SecurityClassifier()
result = classifier.classify("格式化硬盘 sudo mkfs") result = classifier.classify("格式化硬盘 sudo mkfs")
assert result == RiskLevel.HIGH assert result == RiskLevel.HIGH
def test_security_classifier_get_risk_factors(self): def test_classify_high_risk_modify_project(self):
classifier = SecurityClassifier()
result = classifier.classify("修改 backend 代码")
assert result == RiskLevel.HIGH
def test_classify_high_risk_edit_config(self):
classifier = SecurityClassifier()
result = classifier.classify("编辑 .env 配置文件")
assert result == RiskLevel.HIGH
def test_classify_high_risk_git_push(self):
classifier = SecurityClassifier()
result = classifier.classify("git push 到远程")
assert result == RiskLevel.HIGH
def test_classify_high_risk_sudo(self):
classifier = SecurityClassifier()
result = classifier.classify("sudo chmod 777")
assert result == RiskLevel.HIGH
# Project path tests
def test_classify_high_risk_jarvis_path(self):
classifier = SecurityClassifier()
result = classifier.classify("修改代码", target_path="backend/jarvis")
assert result == RiskLevel.HIGH
def test_classify_high_risk_git_path(self):
classifier = SecurityClassifier()
result = classifier.classify("修改代码", target_path=".git/config")
assert result == RiskLevel.HIGH
# get_risk_factors tests
def test_get_risk_factors_high_risk(self):
classifier = SecurityClassifier() classifier = SecurityClassifier()
factors = classifier.get_risk_factors("删除系统文件") factors = classifier.get_risk_factors("删除系统文件")
assert len(factors) > 0 assert factors["has_high_risk_keywords"] is True
assert factors["has_low_risk_keywords"] is False
def test_get_risk_factors_low_risk(self):
classifier = SecurityClassifier()
factors = classifier.get_risk_factors("写一个 demo")
assert factors["has_low_risk_keywords"] is True
assert factors["has_high_risk_keywords"] is False
def test_get_risk_factors_project_path(self):
classifier = SecurityClassifier()
factors = classifier.get_risk_factors("修改代码", target_path="backend/app")
assert factors["is_project_path"] is True
def test_get_risk_factors_non_project_path(self):
classifier = SecurityClassifier()
factors = classifier.get_risk_factors("修改代码", target_path="/tmp/foo")
assert factors["is_project_path"] is False
# Edge cases
def test_classify_empty_string(self):
classifier = SecurityClassifier()
result = classifier.classify("")
# Empty string should default to HIGH (保守策略)
assert result == RiskLevel.HIGH
def test_classify_chinese_edit_keywords(self):
classifier = SecurityClassifier()
result = classifier.classify("编辑这个文件")
assert result == RiskLevel.HIGH
class TestPhase2SandboxEnvironment:
"""SandboxEnvironment tests"""
@pytest.mark.asyncio
async def test_create_sandbox_environment(self):
env = await SandboxEnvironment.create()
assert env.workspace_path.exists()
assert env.session_id is not None
assert len(env.session_id) > 0
# Cleanup
await env.cleanup()
@pytest.mark.asyncio
async def test_create_sandbox_with_custom_prefix(self):
env = await SandboxEnvironment.create(prefix="test_")
assert env.workspace_path.exists()
# Cleanup
await env.cleanup()
@pytest.mark.asyncio
async def test_cleanup_removes_directory(self):
env = await SandboxEnvironment.create()
path = env.workspace_path
assert path.exists()
await env.cleanup()
assert not path.exists()
@pytest.mark.asyncio
async def test_list_created_files_empty(self):
env = await SandboxEnvironment.create()
try:
files = env.list_created_files()
assert files == []
finally:
await env.cleanup()
@pytest.mark.asyncio
async def test_list_created_files_with_files(self):
env = await SandboxEnvironment.create()
try:
# Create a test file
test_file = env.workspace_path / "test.py"
test_file.write_text("print('hello')")
files = env.list_created_files()
assert "test.py" in files
finally:
await env.cleanup()
class TestPhase2DirectExecutor: class TestPhase2DirectExecutor:
def test_direct_executor_initialization(self): """DirectExecutor tests with mocked subprocess"""
adapter = ClaudeAdapter()
executor = DirectExecutor(adapter)
assert executor is not None
assert executor.adapter is adapter
def test_direct_executor_is_installed_returns_bool(self): @pytest.mark.asyncio
async def test_execute_not_installed(self):
adapter = ClaudeAdapter() adapter = ClaudeAdapter()
# Mock is_installed to return False
adapter.is_installed = MagicMock(return_value=False)
executor = DirectExecutor(adapter) executor = DirectExecutor(adapter)
# is_installed is on the adapter, not the executor
result = executor.adapter.is_installed() output_parts = []
assert isinstance(result, bool) async for line in executor.execute("test prompt"):
output_parts.append(line)
output = "".join(output_parts)
assert "[ERROR]" in output
assert "not installed" in output
@pytest.mark.asyncio
async def test_execute_with_mocked_process(self):
adapter = GeminiAdapter() # Gemini doesn't require workspace
executor = DirectExecutor(adapter, timeout=5)
# Mock subprocess
mock_process = AsyncMock()
mock_process.stdout.readline = AsyncMock(
side_effect=[
b"Hello World\n",
b"",
]
)
mock_process.communicate = AsyncMock(return_value=(b"", b""))
mock_process.returncode = 0
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
output_parts = []
async for line in executor.execute("test prompt"):
output_parts.append(line)
output = "".join(output_parts)
assert "Hello World" in output
@pytest.mark.asyncio
async def test_execute_sync_returns_result(self):
adapter = GeminiAdapter()
executor = DirectExecutor(adapter, timeout=5)
mock_process = AsyncMock()
mock_process.stdout.readline = AsyncMock(
side_effect=[
b"output line\n",
b"",
]
)
mock_process.communicate = AsyncMock(return_value=(b"", b""))
mock_process.returncode = 0
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
result = await executor.execute_sync("test")
assert result.stdout is not None
assert "output line" in result.stdout
class TestPhase2SandboxExecutor:
"""SandboxExecutor tests with mocked subprocess"""
@pytest.mark.asyncio
async def test_sandbox_executor_initialization(self):
adapter = ClaudeAdapter()
executor = SandboxExecutor(adapter)
assert executor.adapter is adapter
assert executor.timeout == 300
assert executor._sessions == {}
@pytest.mark.asyncio
async def test_sandbox_executor_custom_timeout(self):
adapter = ClaudeAdapter()
executor = SandboxExecutor(adapter, timeout=600)
assert executor.timeout == 600
@pytest.mark.asyncio
async def test_execute_creates_sandbox(self):
adapter = ClaudeAdapter()
executor = SandboxExecutor(adapter, timeout=5)
mock_process = AsyncMock()
mock_process.stdout.readline = AsyncMock(
side_effect=[
b"Creating sandbox files...\n",
b"",
]
)
mock_process.communicate = AsyncMock(return_value=(b"", b""))
mock_process.returncode = 0
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
output_parts = []
async for line in executor.execute("test prompt"):
output_parts.append(line)
output = "".join(output_parts)
assert "Creating sandbox files" in output or "[EXIT_CODE]" in output
@pytest.mark.asyncio
async def test_cleanup_session_removes_sandbox(self):
adapter = ClaudeAdapter()
executor = SandboxExecutor(adapter, timeout=5)
# Create a sandbox first
env = await SandboxEnvironment.create()
executor._sessions[env.session_id] = env
result = await executor.cleanup_session(env.session_id)
assert result is True
assert env.session_id not in executor._sessions
# Directory should be cleaned up
assert not env.workspace_path.exists()
@pytest.mark.asyncio
async def test_cleanup_session_unknown_id(self):
adapter = ClaudeAdapter()
executor = SandboxExecutor(adapter)
result = await executor.cleanup_session("unknown_id")
assert result is False
@pytest.mark.asyncio
async def test_get_session(self):
adapter = ClaudeAdapter()
executor = SandboxExecutor(adapter)
env = await SandboxEnvironment.create()
executor._sessions[env.session_id] = env
retrieved = executor.get_session(env.session_id)
assert retrieved is env
# Cleanup
await env.cleanup()
@pytest.mark.asyncio
async def test_get_session_unknown(self):
adapter = ClaudeAdapter()
executor = SandboxExecutor(adapter)
result = executor.get_session("unknown")
assert result is None
# ============================================================================= # =============================================================================
@@ -252,6 +540,19 @@ class TestPhase3Schemas:
assert task.user_prompt == "写一个贪食蛇" assert task.user_prompt == "写一个贪食蛇"
assert task.task_id is not None assert task.task_id is not None
def test_code_task_with_parent(self):
task = CodeTask(
task_type=CodeTaskType.PROJECT,
ai_provider=CodeProviderType.GEMINI,
sandbox_mode=True,
user_prompt="修改项目",
parent_task_id="parent-123",
thread_id="thread-456",
)
assert task.parent_task_id == "parent-123"
assert task.thread_id == "thread-456"
assert task.sandbox_mode is True
def test_code_execution_result_schema(self): def test_code_execution_result_schema(self):
result = CodeExecutionResultSchema( result = CodeExecutionResultSchema(
success=True, success=True,
@@ -263,3 +564,24 @@ class TestPhase3Schemas:
assert result.message == "执行成功" assert result.message == "执行成功"
assert result.output == "Hello World" assert result.output == "Hello World"
assert result.files_created == ["main.py"] assert result.files_created == ["main.py"]
def test_code_execution_result_schema_with_error(self):
result = CodeExecutionResultSchema(
success=False,
message="执行失败",
error="Command not found",
exit_code=1,
)
assert result.success is False
assert result.error == "Command not found"
assert result.exit_code == 1
def test_code_execution_result_schema_with_time(self):
result = CodeExecutionResultSchema(
success=True,
message="完成",
execution_time=1.5,
sandbox_session_id="sandbox-123",
)
assert result.execution_time == 1.5
assert result.sandbox_session_id == "sandbox-123"