diff --git a/backend/tests/backend/app/agents/test_code_commander.py b/backend/tests/backend/app/agents/test_code_commander.py index d6abe6e..dceeb9c 100644 --- a/backend/tests/backend/app/agents/test_code_commander.py +++ b/backend/tests/backend/app/agents/test_code_commander.py @@ -2,9 +2,12 @@ Tests for Code Commander module (Phases 1-3) """ +import asyncio import sys +import tempfile +from pathlib import Path from types import SimpleNamespace -from unittest.mock import Mock +from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest @@ -39,6 +42,11 @@ from app.agents.tools.security_classifier import ( SecurityClassifier, ) from app.agents.tools.direct_executor import DirectExecutor +from app.agents.tools.sandbox_executor import ( + SandboxExecutor, + SandboxEnvironment, + ExecutionResult as SandboxExecutionResult, +) from app.agents.schemas.task import ( CodeProviderType, RiskLevelType, @@ -171,51 +179,331 @@ class TestPhase2AIAdapter: assert result.error is None assert result.exit_code == 0 + def test_claude_adapter_build_command(self): + adapter = ClaudeAdapter() + cmd = adapter.build_command("test prompt", Path("/tmp/workspace")) + assert "claude" in cmd + assert "-p" in cmd + assert "test prompt" in cmd + + def test_gemini_adapter_build_command(self): + adapter = GeminiAdapter() + cmd = adapter.build_command("test prompt", None) + assert "gemini" in cmd + assert "-p" in cmd + class TestPhase2SecurityClassifier: + """Comprehensive SecurityClassifier tests""" + def test_risk_level_enum_values(self): assert RiskLevel.LOW.value == "low" assert RiskLevel.HIGH.value == "high" - def test_security_classifier_classify_low_risk_demo(self): + # LOW risk tests + def test_classify_low_risk_demo(self): classifier = SecurityClassifier() result = classifier.classify("写一个贪食蛇 demo") assert result == RiskLevel.LOW - def test_security_classifier_classify_low_risk_simple(self): + def test_classify_low_risk_example(self): classifier = SecurityClassifier() - result = classifier.classify("帮我写一个 hello world") + result = classifier.classify("给我一个代码示例") assert result == RiskLevel.LOW - def test_security_classifier_classify_high_risk_dangerous(self): + def test_classify_low_risk_small_game(self): + classifier = SecurityClassifier() + result = classifier.classify("创建一个俄罗斯方块小游戏") + assert result == RiskLevel.LOW + + def test_classify_low_risk_new_project(self): + classifier = SecurityClassifier() + result = classifier.classify("帮我创建一个新项目") + assert result == RiskLevel.LOW + + def test_classify_low_risk_write_file(self): + classifier = SecurityClassifier() + result = classifier.classify("写一个 hello world") + assert result == RiskLevel.LOW + + def test_classify_low_risk_generate_code(self): + classifier = SecurityClassifier() + # "代码示例" is explicitly in LOW_RISK_KEYWORDS + result = classifier.classify("给我一个代码示例") + assert result == RiskLevel.LOW + + # HIGH risk tests + def test_classify_high_risk_delete(self): classifier = SecurityClassifier() result = classifier.classify("删除所有文件 rm -rf") assert result == RiskLevel.HIGH - def test_security_classifier_classify_high_risk_format(self): + def test_classify_high_risk_format(self): classifier = SecurityClassifier() result = classifier.classify("格式化硬盘 sudo mkfs") assert result == RiskLevel.HIGH - def test_security_classifier_get_risk_factors(self): + def test_classify_high_risk_modify_project(self): + classifier = SecurityClassifier() + result = classifier.classify("修改 backend 代码") + assert result == RiskLevel.HIGH + + def test_classify_high_risk_edit_config(self): + classifier = SecurityClassifier() + result = classifier.classify("编辑 .env 配置文件") + assert result == RiskLevel.HIGH + + def test_classify_high_risk_git_push(self): + classifier = SecurityClassifier() + result = classifier.classify("git push 到远程") + assert result == RiskLevel.HIGH + + def test_classify_high_risk_sudo(self): + classifier = SecurityClassifier() + result = classifier.classify("sudo chmod 777") + assert result == RiskLevel.HIGH + + # Project path tests + def test_classify_high_risk_jarvis_path(self): + classifier = SecurityClassifier() + result = classifier.classify("修改代码", target_path="backend/jarvis") + assert result == RiskLevel.HIGH + + def test_classify_high_risk_git_path(self): + classifier = SecurityClassifier() + result = classifier.classify("修改代码", target_path=".git/config") + assert result == RiskLevel.HIGH + + # get_risk_factors tests + def test_get_risk_factors_high_risk(self): classifier = SecurityClassifier() factors = classifier.get_risk_factors("删除系统文件") - assert len(factors) > 0 + assert factors["has_high_risk_keywords"] is True + assert factors["has_low_risk_keywords"] is False + + def test_get_risk_factors_low_risk(self): + classifier = SecurityClassifier() + factors = classifier.get_risk_factors("写一个 demo") + assert factors["has_low_risk_keywords"] is True + assert factors["has_high_risk_keywords"] is False + + def test_get_risk_factors_project_path(self): + classifier = SecurityClassifier() + factors = classifier.get_risk_factors("修改代码", target_path="backend/app") + assert factors["is_project_path"] is True + + def test_get_risk_factors_non_project_path(self): + classifier = SecurityClassifier() + factors = classifier.get_risk_factors("修改代码", target_path="/tmp/foo") + assert factors["is_project_path"] is False + + # Edge cases + def test_classify_empty_string(self): + classifier = SecurityClassifier() + result = classifier.classify("") + # Empty string should default to HIGH (保守策略) + assert result == RiskLevel.HIGH + + def test_classify_chinese_edit_keywords(self): + classifier = SecurityClassifier() + result = classifier.classify("编辑这个文件") + assert result == RiskLevel.HIGH + + +class TestPhase2SandboxEnvironment: + """SandboxEnvironment tests""" + + @pytest.mark.asyncio + async def test_create_sandbox_environment(self): + env = await SandboxEnvironment.create() + assert env.workspace_path.exists() + assert env.session_id is not None + assert len(env.session_id) > 0 + # Cleanup + await env.cleanup() + + @pytest.mark.asyncio + async def test_create_sandbox_with_custom_prefix(self): + env = await SandboxEnvironment.create(prefix="test_") + assert env.workspace_path.exists() + # Cleanup + await env.cleanup() + + @pytest.mark.asyncio + async def test_cleanup_removes_directory(self): + env = await SandboxEnvironment.create() + path = env.workspace_path + assert path.exists() + await env.cleanup() + assert not path.exists() + + @pytest.mark.asyncio + async def test_list_created_files_empty(self): + env = await SandboxEnvironment.create() + try: + files = env.list_created_files() + assert files == [] + finally: + await env.cleanup() + + @pytest.mark.asyncio + async def test_list_created_files_with_files(self): + env = await SandboxEnvironment.create() + try: + # Create a test file + test_file = env.workspace_path / "test.py" + test_file.write_text("print('hello')") + files = env.list_created_files() + assert "test.py" in files + finally: + await env.cleanup() class TestPhase2DirectExecutor: - def test_direct_executor_initialization(self): - adapter = ClaudeAdapter() - executor = DirectExecutor(adapter) - assert executor is not None - assert executor.adapter is adapter + """DirectExecutor tests with mocked subprocess""" - def test_direct_executor_is_installed_returns_bool(self): + @pytest.mark.asyncio + async def test_execute_not_installed(self): adapter = ClaudeAdapter() + # Mock is_installed to return False + adapter.is_installed = MagicMock(return_value=False) executor = DirectExecutor(adapter) - # is_installed is on the adapter, not the executor - result = executor.adapter.is_installed() - assert isinstance(result, bool) + + output_parts = [] + async for line in executor.execute("test prompt"): + output_parts.append(line) + + output = "".join(output_parts) + assert "[ERROR]" in output + assert "not installed" in output + + @pytest.mark.asyncio + async def test_execute_with_mocked_process(self): + adapter = GeminiAdapter() # Gemini doesn't require workspace + executor = DirectExecutor(adapter, timeout=5) + + # Mock subprocess + mock_process = AsyncMock() + mock_process.stdout.readline = AsyncMock( + side_effect=[ + b"Hello World\n", + b"", + ] + ) + mock_process.communicate = AsyncMock(return_value=(b"", b"")) + mock_process.returncode = 0 + + with patch("asyncio.create_subprocess_exec", return_value=mock_process): + output_parts = [] + async for line in executor.execute("test prompt"): + output_parts.append(line) + + output = "".join(output_parts) + assert "Hello World" in output + + @pytest.mark.asyncio + async def test_execute_sync_returns_result(self): + adapter = GeminiAdapter() + executor = DirectExecutor(adapter, timeout=5) + + mock_process = AsyncMock() + mock_process.stdout.readline = AsyncMock( + side_effect=[ + b"output line\n", + b"", + ] + ) + mock_process.communicate = AsyncMock(return_value=(b"", b"")) + mock_process.returncode = 0 + + with patch("asyncio.create_subprocess_exec", return_value=mock_process): + result = await executor.execute_sync("test") + assert result.stdout is not None + assert "output line" in result.stdout + + +class TestPhase2SandboxExecutor: + """SandboxExecutor tests with mocked subprocess""" + + @pytest.mark.asyncio + async def test_sandbox_executor_initialization(self): + adapter = ClaudeAdapter() + executor = SandboxExecutor(adapter) + assert executor.adapter is adapter + assert executor.timeout == 300 + assert executor._sessions == {} + + @pytest.mark.asyncio + async def test_sandbox_executor_custom_timeout(self): + adapter = ClaudeAdapter() + executor = SandboxExecutor(adapter, timeout=600) + assert executor.timeout == 600 + + @pytest.mark.asyncio + async def test_execute_creates_sandbox(self): + adapter = ClaudeAdapter() + executor = SandboxExecutor(adapter, timeout=5) + + mock_process = AsyncMock() + mock_process.stdout.readline = AsyncMock( + side_effect=[ + b"Creating sandbox files...\n", + b"", + ] + ) + mock_process.communicate = AsyncMock(return_value=(b"", b"")) + mock_process.returncode = 0 + + with patch("asyncio.create_subprocess_exec", return_value=mock_process): + output_parts = [] + async for line in executor.execute("test prompt"): + output_parts.append(line) + + output = "".join(output_parts) + assert "Creating sandbox files" in output or "[EXIT_CODE]" in output + + @pytest.mark.asyncio + async def test_cleanup_session_removes_sandbox(self): + adapter = ClaudeAdapter() + executor = SandboxExecutor(adapter, timeout=5) + + # Create a sandbox first + env = await SandboxEnvironment.create() + executor._sessions[env.session_id] = env + + result = await executor.cleanup_session(env.session_id) + assert result is True + assert env.session_id not in executor._sessions + # Directory should be cleaned up + assert not env.workspace_path.exists() + + @pytest.mark.asyncio + async def test_cleanup_session_unknown_id(self): + adapter = ClaudeAdapter() + executor = SandboxExecutor(adapter) + result = await executor.cleanup_session("unknown_id") + assert result is False + + @pytest.mark.asyncio + async def test_get_session(self): + adapter = ClaudeAdapter() + executor = SandboxExecutor(adapter) + + env = await SandboxEnvironment.create() + executor._sessions[env.session_id] = env + + retrieved = executor.get_session(env.session_id) + assert retrieved is env + + # Cleanup + await env.cleanup() + + @pytest.mark.asyncio + async def test_get_session_unknown(self): + adapter = ClaudeAdapter() + executor = SandboxExecutor(adapter) + result = executor.get_session("unknown") + assert result is None # ============================================================================= @@ -252,6 +540,19 @@ class TestPhase3Schemas: assert task.user_prompt == "写一个贪食蛇" assert task.task_id is not None + def test_code_task_with_parent(self): + task = CodeTask( + task_type=CodeTaskType.PROJECT, + ai_provider=CodeProviderType.GEMINI, + sandbox_mode=True, + user_prompt="修改项目", + parent_task_id="parent-123", + thread_id="thread-456", + ) + assert task.parent_task_id == "parent-123" + assert task.thread_id == "thread-456" + assert task.sandbox_mode is True + def test_code_execution_result_schema(self): result = CodeExecutionResultSchema( success=True, @@ -263,3 +564,24 @@ class TestPhase3Schemas: assert result.message == "执行成功" assert result.output == "Hello World" assert result.files_created == ["main.py"] + + def test_code_execution_result_schema_with_error(self): + result = CodeExecutionResultSchema( + success=False, + message="执行失败", + error="Command not found", + exit_code=1, + ) + assert result.success is False + assert result.error == "Command not found" + assert result.exit_code == 1 + + def test_code_execution_result_schema_with_time(self): + result = CodeExecutionResultSchema( + success=True, + message="完成", + execution_time=1.5, + sandbox_session_id="sandbox-123", + ) + assert result.execution_time == 1.5 + assert result.sandbox_session_id == "sandbox-123"